1use super::{
2 manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::{
10 CompactU256, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals,
11};
12use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
13use reth_node_types::NodePrimitives;
14use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
15use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
16use std::{
17 borrow::Borrow,
18 fmt::Debug,
19 path::{Path, PathBuf},
20 sync::{Arc, Weak},
21 time::Instant,
22};
23use tracing::debug;
24
25#[derive(Debug)]
30pub(crate) struct StaticFileWriters<N> {
31 headers: RwLock<Option<StaticFileProviderRW<N>>>,
32 transactions: RwLock<Option<StaticFileProviderRW<N>>>,
33 receipts: RwLock<Option<StaticFileProviderRW<N>>>,
34 block_meta: RwLock<Option<StaticFileProviderRW<N>>>,
35}
36
37impl<N> Default for StaticFileWriters<N> {
38 fn default() -> Self {
39 Self {
40 headers: Default::default(),
41 transactions: Default::default(),
42 receipts: Default::default(),
43 block_meta: Default::default(),
44 }
45 }
46}
47
48impl<N: NodePrimitives> StaticFileWriters<N> {
49 pub(crate) fn get_or_create(
50 &self,
51 segment: StaticFileSegment,
52 create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
53 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
54 let mut write_guard = match segment {
55 StaticFileSegment::Headers => self.headers.write(),
56 StaticFileSegment::Transactions => self.transactions.write(),
57 StaticFileSegment::Receipts => self.receipts.write(),
58 StaticFileSegment::BlockMeta => self.block_meta.write(),
59 };
60
61 if write_guard.is_none() {
62 *write_guard = Some(create_fn()?);
63 }
64
65 Ok(StaticFileProviderRWRefMut(write_guard))
66 }
67
68 pub(crate) fn commit(&self) -> ProviderResult<()> {
69 for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
70 let mut writer = writer_lock.write();
71 if let Some(writer) = writer.as_mut() {
72 writer.commit()?;
73 }
74 }
75 Ok(())
76 }
77}
78
79#[derive(Debug)]
81pub struct StaticFileProviderRWRefMut<'a, N>(
82 pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
83);
84
85impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
86 fn deref_mut(&mut self) -> &mut Self::Target {
87 self.0.as_mut().expect("static file writer provider should be init")
89 }
90}
91
92impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
93 type Target = StaticFileProviderRW<N>;
94
95 fn deref(&self) -> &Self::Target {
96 self.0.as_ref().expect("static file writer provider should be init")
98 }
99}
100
101#[derive(Debug)]
102pub struct StaticFileProviderRW<N> {
104 reader: Weak<StaticFileProviderInner<N>>,
108 writer: NippyJarWriter<SegmentHeader>,
110 data_path: PathBuf,
112 buf: Vec<u8>,
114 metrics: Option<Arc<StaticFileProviderMetrics>>,
116 prune_on_commit: Option<(u64, Option<BlockNumber>)>,
119}
120
121impl<N: NodePrimitives> StaticFileProviderRW<N> {
122 pub fn new(
127 segment: StaticFileSegment,
128 block: BlockNumber,
129 reader: Weak<StaticFileProviderInner<N>>,
130 metrics: Option<Arc<StaticFileProviderMetrics>>,
131 ) -> ProviderResult<Self> {
132 let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
133 let mut writer = Self {
134 writer,
135 data_path,
136 buf: Vec::with_capacity(100),
137 reader,
138 metrics,
139 prune_on_commit: None,
140 };
141
142 writer.ensure_end_range_consistency()?;
143
144 Ok(writer)
145 }
146
147 fn open(
148 segment: StaticFileSegment,
149 block: u64,
150 reader: Weak<StaticFileProviderInner<N>>,
151 metrics: Option<Arc<StaticFileProviderMetrics>>,
152 ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
153 let start = Instant::now();
154
155 let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
156
157 let block_range = static_file_provider.find_fixed_range(block);
158 let (jar, path) = match static_file_provider.get_segment_provider_from_block(
159 segment,
160 block_range.start(),
161 None,
162 ) {
163 Ok(provider) => (
164 NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
165 provider.data_path().into(),
166 ),
167 Err(ProviderError::MissingStaticFileBlock(_, _)) => {
168 let path = static_file_provider.directory().join(segment.filename(&block_range));
169 (create_jar(segment, &path, block_range), path)
170 }
171 Err(err) => return Err(err),
172 };
173
174 let result = match NippyJarWriter::new(jar) {
175 Ok(writer) => Ok((writer, path)),
176 Err(NippyJarError::FrozenJar) => {
177 Err(ProviderError::FinalizedStaticFile(segment, block))
179 }
180 Err(e) => Err(ProviderError::other(e)),
181 }?;
182
183 if let Some(metrics) = &metrics {
184 metrics.record_segment_operation(
185 segment,
186 StaticFileProviderOperation::OpenWriter,
187 Some(start.elapsed()),
188 );
189 }
190
191 Ok(result)
192 }
193
194 fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
203 let expected_rows = if self.user_header().segment().is_headers() {
205 self.user_header().block_len().unwrap_or_default()
206 } else {
207 self.user_header().tx_len().unwrap_or_default()
208 };
209 let pruned_rows = expected_rows - self.writer.rows() as u64;
210 if pruned_rows > 0 {
211 self.user_header_mut().prune(pruned_rows);
212 }
213
214 self.writer.commit().map_err(ProviderError::other)?;
215
216 self.update_index()?;
218 Ok(())
219 }
220
221 pub fn commit(&mut self) -> ProviderResult<()> {
223 let start = Instant::now();
224
225 if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
227 match self.writer.user_header().segment() {
228 StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
229 StaticFileSegment::Transactions => self
230 .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
231 StaticFileSegment::Receipts => {
232 self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
233 }
234 StaticFileSegment::BlockMeta => todo!(),
235 }
236 }
237
238 if self.writer.is_dirty() {
239 self.writer.commit().map_err(ProviderError::other)?;
241
242 if let Some(metrics) = &self.metrics {
243 metrics.record_segment_operation(
244 self.writer.user_header().segment(),
245 StaticFileProviderOperation::CommitWriter,
246 Some(start.elapsed()),
247 );
248 }
249
250 debug!(
251 target: "provider::static_file",
252 segment = ?self.writer.user_header().segment(),
253 path = ?self.data_path,
254 duration = ?start.elapsed(),
255 "Commit"
256 );
257
258 self.update_index()?;
259 }
260
261 Ok(())
262 }
263
264 #[cfg(feature = "test-utils")]
268 pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
269 let start = Instant::now();
270
271 self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
273
274 if let Some(metrics) = &self.metrics {
275 metrics.record_segment_operation(
276 self.writer.user_header().segment(),
277 StaticFileProviderOperation::CommitWriter,
278 Some(start.elapsed()),
279 );
280 }
281
282 debug!(
283 target: "provider::static_file",
284 segment = ?self.writer.user_header().segment(),
285 path = ?self.data_path,
286 duration = ?start.elapsed(),
287 "Commit"
288 );
289
290 self.update_index()?;
291
292 Ok(())
293 }
294
295 fn update_index(&self) -> ProviderResult<()> {
297 let segment_max_block = self
306 .writer
307 .user_header()
308 .block_range()
309 .as_ref()
310 .map(|block_range| block_range.end())
311 .or_else(|| {
312 (self.writer.user_header().expected_block_start() > 0)
313 .then(|| self.writer.user_header().expected_block_start() - 1)
314 });
315
316 self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
317 }
318
319 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
324 let segment = self.writer.user_header().segment();
325
326 self.check_next_block_number(expected_block_number)?;
327
328 let start = Instant::now();
329 if let Some(last_block) = self.writer.user_header().block_end() {
330 if last_block == self.writer.user_header().expected_block_end() {
332 self.commit()?;
334
335 let (writer, data_path) =
337 Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
338 self.writer = writer;
339 self.data_path = data_path;
340
341 *self.writer.user_header_mut() = SegmentHeader::new(
342 self.reader().find_fixed_range(last_block + 1),
343 None,
344 None,
345 segment,
346 );
347 }
348 }
349
350 self.writer.user_header_mut().increment_block();
351 if let Some(metrics) = &self.metrics {
352 metrics.record_segment_operation(
353 segment,
354 StaticFileProviderOperation::IncrementBlock,
355 Some(start.elapsed()),
356 );
357 }
358
359 Ok(())
360 }
361
362 fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
365 let next_static_file_block = self
369 .writer
370 .user_header()
371 .block_end()
372 .map(|b| b + 1)
373 .unwrap_or_else(|| self.writer.user_header().expected_block_start());
374
375 if expected_block_number != next_static_file_block {
376 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
377 self.writer.user_header().segment(),
378 expected_block_number,
379 next_static_file_block,
380 ))
381 }
382 Ok(())
383 }
384
385 fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
393 let mut remaining_rows = num_rows;
394 let segment = self.writer.user_header().segment();
395 while remaining_rows > 0 {
396 let len = if segment.is_block_based() {
397 self.writer.user_header().block_len().unwrap_or_default()
398 } else {
399 self.writer.user_header().tx_len().unwrap_or_default()
400 };
401
402 if remaining_rows >= len {
403 let block_start = self.writer.user_header().expected_block_start();
406
407 if block_start != 0 &&
413 (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
414 {
415 self.delete_current_and_open_previous()?;
416 } else {
417 self.writer.user_header_mut().prune(len);
419 self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
420 break
421 }
422
423 remaining_rows -= len;
424 } else {
425 self.writer.user_header_mut().prune(remaining_rows);
427
428 self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
430 remaining_rows = 0;
431 }
432 }
433
434 if let Some(last_block) = last_block {
436 let mut expected_block_start = self.writer.user_header().expected_block_start();
437
438 if num_rows == 0 {
439 while last_block < expected_block_start {
443 self.delete_current_and_open_previous()?;
444 expected_block_start = self.writer.user_header().expected_block_start();
445 }
446 }
447 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
448 }
449
450 self.commit()?;
452
453 Ok(())
454 }
455
456 fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
459 let current_path = self.data_path.clone();
460 let (previous_writer, data_path) = Self::open(
461 self.user_header().segment(),
462 self.writer.user_header().expected_block_start() - 1,
463 self.reader.clone(),
464 self.metrics.clone(),
465 )?;
466 self.writer = previous_writer;
467 self.writer.set_dirty();
468 self.data_path = data_path;
469 NippyJar::<SegmentHeader>::load(¤t_path)
470 .map_err(ProviderError::other)?
471 .delete()
472 .map_err(ProviderError::other)?;
473 Ok(())
474 }
475
476 fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
478 self.buf.clear();
479 column.to_compact(&mut self.buf);
480
481 self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
482 Ok(())
483 }
484
485 fn append_with_tx_number<V: Compact>(
489 &mut self,
490 tx_num: TxNumber,
491 value: V,
492 ) -> ProviderResult<()> {
493 if let Some(range) = self.writer.user_header().tx_range() {
494 let next_tx = range.end() + 1;
495 if next_tx != tx_num {
496 return Err(ProviderError::UnexpectedStaticFileTxNumber(
497 self.writer.user_header().segment(),
498 tx_num,
499 next_tx,
500 ))
501 }
502 self.writer.user_header_mut().increment_tx();
503 } else {
504 self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
505 }
506
507 self.append_column(value)?;
508
509 Ok(())
510 }
511
512 pub fn append_header(
519 &mut self,
520 header: &N::BlockHeader,
521 total_difficulty: U256,
522 hash: &BlockHash,
523 ) -> ProviderResult<()>
524 where
525 N::BlockHeader: Compact,
526 {
527 let start = Instant::now();
528 self.ensure_no_queued_prune()?;
529
530 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
531
532 self.increment_block(header.number())?;
533
534 self.append_column(header)?;
535 self.append_column(CompactU256::from(total_difficulty))?;
536 self.append_column(hash)?;
537
538 if let Some(metrics) = &self.metrics {
539 metrics.record_segment_operation(
540 StaticFileSegment::Headers,
541 StaticFileProviderOperation::Append,
542 Some(start.elapsed()),
543 );
544 }
545
546 Ok(())
547 }
548
549 pub fn append_eth_block_meta(
554 &mut self,
555 body_indices: &StoredBlockBodyIndices,
556 ommers: &StoredBlockOmmers<N::BlockHeader>,
557 withdrawals: &StoredBlockWithdrawals,
558 expected_block_number: BlockNumber,
559 ) -> ProviderResult<()>
560 where
561 N::BlockHeader: Compact,
562 {
563 self.append_block_meta(body_indices, ommers, withdrawals, expected_block_number)
564 }
565
566 pub fn append_block_meta<F1, F2>(
571 &mut self,
572 body_indices: &StoredBlockBodyIndices,
573 field1: &F1,
574 field2: &F2,
575 expected_block_number: BlockNumber,
576 ) -> ProviderResult<()>
577 where
578 N::BlockHeader: Compact,
579 F1: Compact,
580 F2: Compact,
581 {
582 let start = Instant::now();
583 self.ensure_no_queued_prune()?;
584
585 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::BlockMeta);
586
587 self.increment_block(expected_block_number)?;
588
589 self.append_column(body_indices)?;
590 self.append_column(field1)?;
591 self.append_column(field2)?;
592
593 if let Some(metrics) = &self.metrics {
594 metrics.record_segment_operation(
595 StaticFileSegment::BlockMeta,
596 StaticFileProviderOperation::Append,
597 Some(start.elapsed()),
598 );
599 }
600
601 Ok(())
602 }
603
604 pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
611 where
612 N::SignedTx: Compact,
613 {
614 let start = Instant::now();
615 self.ensure_no_queued_prune()?;
616
617 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
618 self.append_with_tx_number(tx_num, tx)?;
619
620 if let Some(metrics) = &self.metrics {
621 metrics.record_segment_operation(
622 StaticFileSegment::Transactions,
623 StaticFileProviderOperation::Append,
624 Some(start.elapsed()),
625 );
626 }
627
628 Ok(())
629 }
630
631 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
638 where
639 N::Receipt: Compact,
640 {
641 let start = Instant::now();
642 self.ensure_no_queued_prune()?;
643
644 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
645 self.append_with_tx_number(tx_num, receipt)?;
646
647 if let Some(metrics) = &self.metrics {
648 metrics.record_segment_operation(
649 StaticFileSegment::Receipts,
650 StaticFileProviderOperation::Append,
651 Some(start.elapsed()),
652 );
653 }
654
655 Ok(())
656 }
657
658 pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<Option<TxNumber>>
662 where
663 I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
664 R: Borrow<N::Receipt>,
665 N::Receipt: Compact,
666 {
667 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
668
669 let mut receipts_iter = receipts.into_iter().peekable();
670 if receipts_iter.peek().is_none() {
672 return Ok(None);
673 }
674
675 let start = Instant::now();
676 self.ensure_no_queued_prune()?;
677
678 let mut tx_number = 0;
680 let mut count: u64 = 0;
681
682 for receipt_result in receipts_iter {
683 let (tx_num, receipt) = receipt_result?;
684 self.append_with_tx_number(tx_num, receipt.borrow())?;
685 tx_number = tx_num;
686 count += 1;
687 }
688
689 if let Some(metrics) = &self.metrics {
690 metrics.record_segment_operations(
691 StaticFileSegment::Receipts,
692 StaticFileProviderOperation::Append,
693 count,
694 Some(start.elapsed()),
695 );
696 }
697
698 Ok(Some(tx_number))
699 }
700
701 pub fn prune_transactions(
705 &mut self,
706 to_delete: u64,
707 last_block: BlockNumber,
708 ) -> ProviderResult<()> {
709 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
710 self.queue_prune(to_delete, Some(last_block))
711 }
712
713 pub fn prune_receipts(
717 &mut self,
718 to_delete: u64,
719 last_block: BlockNumber,
720 ) -> ProviderResult<()> {
721 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
722 self.queue_prune(to_delete, Some(last_block))
723 }
724
725 pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
727 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
728 self.queue_prune(to_delete, None)
729 }
730
731 pub fn prune_block_meta(&mut self, to_delete: u64) -> ProviderResult<()> {
733 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::BlockMeta);
734 self.queue_prune(to_delete, None)
735 }
736
737 fn queue_prune(
742 &mut self,
743 to_delete: u64,
744 last_block: Option<BlockNumber>,
745 ) -> ProviderResult<()> {
746 self.ensure_no_queued_prune()?;
747 self.prune_on_commit = Some((to_delete, last_block));
748 Ok(())
749 }
750
751 fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
753 if self.prune_on_commit.is_some() {
754 return Err(ProviderError::other(StaticFileWriterError::new(
755 "Pruning should be committed before appending or pruning more data",
756 )));
757 }
758 Ok(())
759 }
760
761 fn prune_transaction_data(
763 &mut self,
764 to_delete: u64,
765 last_block: BlockNumber,
766 ) -> ProviderResult<()> {
767 let start = Instant::now();
768
769 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
770
771 self.truncate(to_delete, Some(last_block))?;
772
773 if let Some(metrics) = &self.metrics {
774 metrics.record_segment_operation(
775 StaticFileSegment::Transactions,
776 StaticFileProviderOperation::Prune,
777 Some(start.elapsed()),
778 );
779 }
780
781 Ok(())
782 }
783
784 fn prune_receipt_data(
786 &mut self,
787 to_delete: u64,
788 last_block: BlockNumber,
789 ) -> ProviderResult<()> {
790 let start = Instant::now();
791
792 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
793
794 self.truncate(to_delete, Some(last_block))?;
795
796 if let Some(metrics) = &self.metrics {
797 metrics.record_segment_operation(
798 StaticFileSegment::Receipts,
799 StaticFileProviderOperation::Prune,
800 Some(start.elapsed()),
801 );
802 }
803
804 Ok(())
805 }
806
807 fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
809 let start = Instant::now();
810
811 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
812
813 self.truncate(to_delete, None)?;
814
815 if let Some(metrics) = &self.metrics {
816 metrics.record_segment_operation(
817 StaticFileSegment::Headers,
818 StaticFileProviderOperation::Prune,
819 Some(start.elapsed()),
820 );
821 }
822
823 Ok(())
824 }
825
826 fn reader(&self) -> StaticFileProvider<N> {
827 Self::upgrade_provider_to_strong_reference(&self.reader)
828 }
829
830 fn upgrade_provider_to_strong_reference(
839 provider: &Weak<StaticFileProviderInner<N>>,
840 ) -> StaticFileProvider<N> {
841 provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
842 }
843
844 pub const fn user_header(&self) -> &SegmentHeader {
846 self.writer.user_header()
847 }
848
849 pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
851 self.writer.user_header_mut()
852 }
853
854 #[cfg(any(test, feature = "test-utils"))]
856 pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
857 self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
858 }
859
860 #[cfg(any(test, feature = "test-utils"))]
862 pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
863 &mut self.writer
864 }
865}
866
867fn create_jar(
868 segment: StaticFileSegment,
869 path: &Path,
870 expected_block_range: SegmentRangeInclusive,
871) -> NippyJar<SegmentHeader> {
872 let mut jar = NippyJar::new(
873 segment.columns(),
874 path,
875 SegmentHeader::new(expected_block_range, None, None, segment),
876 );
877
878 if segment.is_headers() {
881 jar = jar.with_lz4();
882 }
883
884 jar
885}