reth_provider/providers/static_file/
writer.rs1use super::{
2 manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::CompactU256;
10use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
11use reth_node_types::NodePrimitives;
12use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
13use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
14use std::{
15 borrow::Borrow,
16 fmt::Debug,
17 path::{Path, PathBuf},
18 sync::{Arc, Weak},
19 time::Instant,
20};
21use tracing::debug;
22
23#[derive(Debug)]
28pub(crate) struct StaticFileWriters<N> {
29 headers: RwLock<Option<StaticFileProviderRW<N>>>,
30 transactions: RwLock<Option<StaticFileProviderRW<N>>>,
31 receipts: RwLock<Option<StaticFileProviderRW<N>>>,
32}
33
34impl<N> Default for StaticFileWriters<N> {
35 fn default() -> Self {
36 Self {
37 headers: Default::default(),
38 transactions: Default::default(),
39 receipts: Default::default(),
40 }
41 }
42}
43
44impl<N: NodePrimitives> StaticFileWriters<N> {
45 pub(crate) fn get_or_create(
46 &self,
47 segment: StaticFileSegment,
48 create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
49 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
50 let mut write_guard = match segment {
51 StaticFileSegment::Headers => self.headers.write(),
52 StaticFileSegment::Transactions => self.transactions.write(),
53 StaticFileSegment::Receipts => self.receipts.write(),
54 };
55
56 if write_guard.is_none() {
57 *write_guard = Some(create_fn()?);
58 }
59
60 Ok(StaticFileProviderRWRefMut(write_guard))
61 }
62
63 pub(crate) fn commit(&self) -> ProviderResult<()> {
64 for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
65 let mut writer = writer_lock.write();
66 if let Some(writer) = writer.as_mut() {
67 writer.commit()?;
68 }
69 }
70 Ok(())
71 }
72}
73
74#[derive(Debug)]
76pub struct StaticFileProviderRWRefMut<'a, N>(
77 pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
78);
79
80impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
81 fn deref_mut(&mut self) -> &mut Self::Target {
82 self.0.as_mut().expect("static file writer provider should be init")
84 }
85}
86
87impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
88 type Target = StaticFileProviderRW<N>;
89
90 fn deref(&self) -> &Self::Target {
91 self.0.as_ref().expect("static file writer provider should be init")
93 }
94}
95
96#[derive(Debug)]
97pub struct StaticFileProviderRW<N> {
99 reader: Weak<StaticFileProviderInner<N>>,
103 writer: NippyJarWriter<SegmentHeader>,
105 data_path: PathBuf,
107 buf: Vec<u8>,
109 metrics: Option<Arc<StaticFileProviderMetrics>>,
111 prune_on_commit: Option<(u64, Option<BlockNumber>)>,
114}
115
116impl<N: NodePrimitives> StaticFileProviderRW<N> {
117 pub fn new(
122 segment: StaticFileSegment,
123 block: BlockNumber,
124 reader: Weak<StaticFileProviderInner<N>>,
125 metrics: Option<Arc<StaticFileProviderMetrics>>,
126 ) -> ProviderResult<Self> {
127 let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
128 let mut writer = Self {
129 writer,
130 data_path,
131 buf: Vec::with_capacity(100),
132 reader,
133 metrics,
134 prune_on_commit: None,
135 };
136
137 writer.ensure_end_range_consistency()?;
138
139 Ok(writer)
140 }
141
142 fn open(
143 segment: StaticFileSegment,
144 block: u64,
145 reader: Weak<StaticFileProviderInner<N>>,
146 metrics: Option<Arc<StaticFileProviderMetrics>>,
147 ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
148 let start = Instant::now();
149
150 let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
151
152 let block_range = static_file_provider.find_fixed_range(block);
153 let (jar, path) = match static_file_provider.get_segment_provider_from_block(
154 segment,
155 block_range.start(),
156 None,
157 ) {
158 Ok(provider) => (
159 NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
160 provider.data_path().into(),
161 ),
162 Err(ProviderError::MissingStaticFileBlock(_, _)) => {
163 let path = static_file_provider.directory().join(segment.filename(&block_range));
164 (create_jar(segment, &path, block_range), path)
165 }
166 Err(err) => return Err(err),
167 };
168
169 let result = match NippyJarWriter::new(jar) {
170 Ok(writer) => Ok((writer, path)),
171 Err(NippyJarError::FrozenJar) => {
172 Err(ProviderError::FinalizedStaticFile(segment, block))
174 }
175 Err(e) => Err(ProviderError::other(e)),
176 }?;
177
178 if let Some(metrics) = &metrics {
179 metrics.record_segment_operation(
180 segment,
181 StaticFileProviderOperation::OpenWriter,
182 Some(start.elapsed()),
183 );
184 }
185
186 Ok(result)
187 }
188
189 fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
198 let expected_rows = if self.user_header().segment().is_headers() {
200 self.user_header().block_len().unwrap_or_default()
201 } else {
202 self.user_header().tx_len().unwrap_or_default()
203 };
204 let pruned_rows = expected_rows - self.writer.rows() as u64;
205 if pruned_rows > 0 {
206 self.user_header_mut().prune(pruned_rows);
207 }
208
209 self.writer.commit().map_err(ProviderError::other)?;
210
211 self.update_index()?;
213 Ok(())
214 }
215
216 pub fn commit(&mut self) -> ProviderResult<()> {
218 let start = Instant::now();
219
220 if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
222 match self.writer.user_header().segment() {
223 StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
224 StaticFileSegment::Transactions => self
225 .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
226 StaticFileSegment::Receipts => {
227 self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
228 }
229 }
230 }
231
232 if self.writer.is_dirty() {
233 self.writer.commit().map_err(ProviderError::other)?;
235
236 if let Some(metrics) = &self.metrics {
237 metrics.record_segment_operation(
238 self.writer.user_header().segment(),
239 StaticFileProviderOperation::CommitWriter,
240 Some(start.elapsed()),
241 );
242 }
243
244 debug!(
245 target: "provider::static_file",
246 segment = ?self.writer.user_header().segment(),
247 path = ?self.data_path,
248 duration = ?start.elapsed(),
249 "Commit"
250 );
251
252 self.update_index()?;
253 }
254
255 Ok(())
256 }
257
258 #[cfg(feature = "test-utils")]
262 pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
263 let start = Instant::now();
264
265 self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
267
268 if let Some(metrics) = &self.metrics {
269 metrics.record_segment_operation(
270 self.writer.user_header().segment(),
271 StaticFileProviderOperation::CommitWriter,
272 Some(start.elapsed()),
273 );
274 }
275
276 debug!(
277 target: "provider::static_file",
278 segment = ?self.writer.user_header().segment(),
279 path = ?self.data_path,
280 duration = ?start.elapsed(),
281 "Commit"
282 );
283
284 self.update_index()?;
285
286 Ok(())
287 }
288
289 fn update_index(&self) -> ProviderResult<()> {
291 let segment_max_block = self
300 .writer
301 .user_header()
302 .block_range()
303 .as_ref()
304 .map(|block_range| block_range.end())
305 .or_else(|| {
306 (self.writer.user_header().expected_block_start() > 0)
307 .then(|| self.writer.user_header().expected_block_start() - 1)
308 });
309
310 self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
311 }
312
313 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
318 let segment = self.writer.user_header().segment();
319
320 self.check_next_block_number(expected_block_number)?;
321
322 let start = Instant::now();
323 if let Some(last_block) = self.writer.user_header().block_end() {
324 if last_block == self.writer.user_header().expected_block_end() {
326 self.commit()?;
328
329 let (writer, data_path) =
331 Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
332 self.writer = writer;
333 self.data_path = data_path;
334
335 *self.writer.user_header_mut() = SegmentHeader::new(
336 self.reader().find_fixed_range(last_block + 1),
337 None,
338 None,
339 segment,
340 );
341 }
342 }
343
344 self.writer.user_header_mut().increment_block();
345 if let Some(metrics) = &self.metrics {
346 metrics.record_segment_operation(
347 segment,
348 StaticFileProviderOperation::IncrementBlock,
349 Some(start.elapsed()),
350 );
351 }
352
353 Ok(())
354 }
355
356 pub fn next_block_number(&self) -> u64 {
358 self.writer
362 .user_header()
363 .block_end()
364 .map(|b| b + 1)
365 .unwrap_or_else(|| self.writer.user_header().expected_block_start())
366 }
367
368 fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
371 let next_static_file_block = self.next_block_number();
372
373 if expected_block_number != next_static_file_block {
374 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
375 self.writer.user_header().segment(),
376 expected_block_number,
377 next_static_file_block,
378 ))
379 }
380 Ok(())
381 }
382
383 fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
391 let mut remaining_rows = num_rows;
392 let segment = self.writer.user_header().segment();
393 while remaining_rows > 0 {
394 let len = if segment.is_block_based() {
395 self.writer.user_header().block_len().unwrap_or_default()
396 } else {
397 self.writer.user_header().tx_len().unwrap_or_default()
398 };
399
400 if remaining_rows >= len {
401 let block_start = self.writer.user_header().expected_block_start();
404
405 if block_start != 0 &&
411 (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
412 {
413 self.delete_current_and_open_previous()?;
414 } else {
415 self.writer.user_header_mut().prune(len);
417 self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
418 break
419 }
420
421 remaining_rows -= len;
422 } else {
423 self.writer.user_header_mut().prune(remaining_rows);
425
426 self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
428 remaining_rows = 0;
429 }
430 }
431
432 if let Some(last_block) = last_block {
434 let mut expected_block_start = self.writer.user_header().expected_block_start();
435
436 if num_rows == 0 {
437 while last_block < expected_block_start {
441 self.delete_current_and_open_previous()?;
442 expected_block_start = self.writer.user_header().expected_block_start();
443 }
444 }
445 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
446 }
447
448 self.commit()?;
450
451 Ok(())
452 }
453
454 fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
457 let current_path = self.data_path.clone();
458 let (previous_writer, data_path) = Self::open(
459 self.user_header().segment(),
460 self.writer.user_header().expected_block_start() - 1,
461 self.reader.clone(),
462 self.metrics.clone(),
463 )?;
464 self.writer = previous_writer;
465 self.writer.set_dirty();
466 self.data_path = data_path;
467 NippyJar::<SegmentHeader>::load(¤t_path)
468 .map_err(ProviderError::other)?
469 .delete()
470 .map_err(ProviderError::other)?;
471 Ok(())
472 }
473
474 fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
476 self.buf.clear();
477 column.to_compact(&mut self.buf);
478
479 self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
480 Ok(())
481 }
482
483 fn append_with_tx_number<V: Compact>(
487 &mut self,
488 tx_num: TxNumber,
489 value: V,
490 ) -> ProviderResult<()> {
491 if let Some(range) = self.writer.user_header().tx_range() {
492 let next_tx = range.end() + 1;
493 if next_tx != tx_num {
494 return Err(ProviderError::UnexpectedStaticFileTxNumber(
495 self.writer.user_header().segment(),
496 tx_num,
497 next_tx,
498 ))
499 }
500 self.writer.user_header_mut().increment_tx();
501 } else {
502 self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
503 }
504
505 self.append_column(value)?;
506
507 Ok(())
508 }
509
510 pub fn append_header(
517 &mut self,
518 header: &N::BlockHeader,
519 total_difficulty: U256,
520 hash: &BlockHash,
521 ) -> ProviderResult<()>
522 where
523 N::BlockHeader: Compact,
524 {
525 let start = Instant::now();
526 self.ensure_no_queued_prune()?;
527
528 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
529
530 self.increment_block(header.number())?;
531
532 self.append_column(header)?;
533 self.append_column(CompactU256::from(total_difficulty))?;
534 self.append_column(hash)?;
535
536 if let Some(metrics) = &self.metrics {
537 metrics.record_segment_operation(
538 StaticFileSegment::Headers,
539 StaticFileProviderOperation::Append,
540 Some(start.elapsed()),
541 );
542 }
543
544 Ok(())
545 }
546
547 pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
554 where
555 N::SignedTx: Compact,
556 {
557 let start = Instant::now();
558 self.ensure_no_queued_prune()?;
559
560 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
561 self.append_with_tx_number(tx_num, tx)?;
562
563 if let Some(metrics) = &self.metrics {
564 metrics.record_segment_operation(
565 StaticFileSegment::Transactions,
566 StaticFileProviderOperation::Append,
567 Some(start.elapsed()),
568 );
569 }
570
571 Ok(())
572 }
573
574 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
581 where
582 N::Receipt: Compact,
583 {
584 let start = Instant::now();
585 self.ensure_no_queued_prune()?;
586
587 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
588 self.append_with_tx_number(tx_num, receipt)?;
589
590 if let Some(metrics) = &self.metrics {
591 metrics.record_segment_operation(
592 StaticFileSegment::Receipts,
593 StaticFileProviderOperation::Append,
594 Some(start.elapsed()),
595 );
596 }
597
598 Ok(())
599 }
600
601 pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<Option<TxNumber>>
605 where
606 I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
607 R: Borrow<N::Receipt>,
608 N::Receipt: Compact,
609 {
610 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
611
612 let mut receipts_iter = receipts.into_iter().peekable();
613 if receipts_iter.peek().is_none() {
615 return Ok(None);
616 }
617
618 let start = Instant::now();
619 self.ensure_no_queued_prune()?;
620
621 let mut tx_number = 0;
623 let mut count: u64 = 0;
624
625 for receipt_result in receipts_iter {
626 let (tx_num, receipt) = receipt_result?;
627 self.append_with_tx_number(tx_num, receipt.borrow())?;
628 tx_number = tx_num;
629 count += 1;
630 }
631
632 if let Some(metrics) = &self.metrics {
633 metrics.record_segment_operations(
634 StaticFileSegment::Receipts,
635 StaticFileProviderOperation::Append,
636 count,
637 Some(start.elapsed()),
638 );
639 }
640
641 Ok(Some(tx_number))
642 }
643
644 pub fn prune_transactions(
648 &mut self,
649 to_delete: u64,
650 last_block: BlockNumber,
651 ) -> ProviderResult<()> {
652 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
653 self.queue_prune(to_delete, Some(last_block))
654 }
655
656 pub fn prune_receipts(
660 &mut self,
661 to_delete: u64,
662 last_block: BlockNumber,
663 ) -> ProviderResult<()> {
664 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
665 self.queue_prune(to_delete, Some(last_block))
666 }
667
668 pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
670 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
671 self.queue_prune(to_delete, None)
672 }
673
674 fn queue_prune(
679 &mut self,
680 to_delete: u64,
681 last_block: Option<BlockNumber>,
682 ) -> ProviderResult<()> {
683 self.ensure_no_queued_prune()?;
684 self.prune_on_commit = Some((to_delete, last_block));
685 Ok(())
686 }
687
688 fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
690 if self.prune_on_commit.is_some() {
691 return Err(ProviderError::other(StaticFileWriterError::new(
692 "Pruning should be committed before appending or pruning more data",
693 )));
694 }
695 Ok(())
696 }
697
698 fn prune_transaction_data(
700 &mut self,
701 to_delete: u64,
702 last_block: BlockNumber,
703 ) -> ProviderResult<()> {
704 let start = Instant::now();
705
706 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
707
708 self.truncate(to_delete, Some(last_block))?;
709
710 if let Some(metrics) = &self.metrics {
711 metrics.record_segment_operation(
712 StaticFileSegment::Transactions,
713 StaticFileProviderOperation::Prune,
714 Some(start.elapsed()),
715 );
716 }
717
718 Ok(())
719 }
720
721 fn prune_receipt_data(
723 &mut self,
724 to_delete: u64,
725 last_block: BlockNumber,
726 ) -> ProviderResult<()> {
727 let start = Instant::now();
728
729 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
730
731 self.truncate(to_delete, Some(last_block))?;
732
733 if let Some(metrics) = &self.metrics {
734 metrics.record_segment_operation(
735 StaticFileSegment::Receipts,
736 StaticFileProviderOperation::Prune,
737 Some(start.elapsed()),
738 );
739 }
740
741 Ok(())
742 }
743
744 fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
746 let start = Instant::now();
747
748 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
749
750 self.truncate(to_delete, None)?;
751
752 if let Some(metrics) = &self.metrics {
753 metrics.record_segment_operation(
754 StaticFileSegment::Headers,
755 StaticFileProviderOperation::Prune,
756 Some(start.elapsed()),
757 );
758 }
759
760 Ok(())
761 }
762
763 fn reader(&self) -> StaticFileProvider<N> {
764 Self::upgrade_provider_to_strong_reference(&self.reader)
765 }
766
767 fn upgrade_provider_to_strong_reference(
776 provider: &Weak<StaticFileProviderInner<N>>,
777 ) -> StaticFileProvider<N> {
778 provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
779 }
780
781 pub const fn user_header(&self) -> &SegmentHeader {
783 self.writer.user_header()
784 }
785
786 pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
788 self.writer.user_header_mut()
789 }
790
791 #[cfg(any(test, feature = "test-utils"))]
793 pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
794 self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
795 }
796
797 #[cfg(any(test, feature = "test-utils"))]
799 pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
800 &mut self.writer
801 }
802}
803
804fn create_jar(
805 segment: StaticFileSegment,
806 path: &Path,
807 expected_block_range: SegmentRangeInclusive,
808) -> NippyJar<SegmentHeader> {
809 let mut jar = NippyJar::new(
810 segment.columns(),
811 path,
812 SegmentHeader::new(expected_block_range, None, None, segment),
813 );
814
815 if segment.is_headers() {
818 jar = jar.with_lz4();
819 }
820
821 jar
822}