1use crate::{find_fixed_range, BlockNumber, Compression};
2use alloc::{format, string::String};
3use alloy_primitives::TxNumber;
4use core::{
5 ops::{Range, RangeInclusive},
6 str::FromStr,
7};
8use reth_stages_types::StageId;
9use serde::{de::Visitor, ser::SerializeStruct, Deserialize, Deserializer, Serialize, Serializer};
10use strum::{EnumIs, EnumString};
11
12#[derive(
13 Debug,
14 Copy,
15 Clone,
16 Eq,
17 PartialEq,
18 Hash,
19 Ord,
20 PartialOrd,
21 EnumString,
22 derive_more::Display,
23 EnumIs,
24 Serialize,
25 Deserialize,
26 fixed_map::Key,
27)]
28#[strum(serialize_all = "kebab-case")]
29#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
30pub enum StaticFileSegment {
32 Headers,
35 Transactions,
37 Receipts,
39 TransactionSenders,
41 AccountChangeSets,
59 StorageChangeSets,
64}
65
66impl StaticFileSegment {
67 pub const fn as_str(&self) -> &'static str {
69 match self {
75 Self::Headers => "headers",
76 Self::Transactions => "transactions",
77 Self::Receipts => "receipts",
78 Self::TransactionSenders => "transaction-senders",
79 Self::AccountChangeSets => "account-change-sets",
80 Self::StorageChangeSets => "storage-change-sets",
81 }
82 }
83
84 pub const fn as_short_str(&self) -> &'static str {
88 match self {
89 Self::Headers => "headers",
90 Self::Transactions => "transactions",
91 Self::Receipts => "receipts",
92 Self::TransactionSenders => "tx-senders",
93 Self::AccountChangeSets => "account-changes",
94 Self::StorageChangeSets => "storage-changes",
95 }
96 }
97
98 pub fn iter() -> impl Iterator<Item = Self> {
100 [
102 Self::Headers,
103 Self::Transactions,
104 Self::Receipts,
105 Self::TransactionSenders,
106 Self::AccountChangeSets,
107 Self::StorageChangeSets,
108 ]
109 .into_iter()
110 }
111
112 pub const fn config(&self) -> SegmentConfig {
114 SegmentConfig { compression: Compression::Lz4 }
115 }
116
117 pub const fn columns(&self) -> usize {
119 match self {
120 Self::Headers => 3,
121 Self::Transactions |
122 Self::Receipts |
123 Self::TransactionSenders |
124 Self::AccountChangeSets |
125 Self::StorageChangeSets => 1,
126 }
127 }
128
129 pub fn filename(&self, block_range: &SegmentRangeInclusive) -> String {
131 format!("static_file_{}_{}_{}", self.as_str(), block_range.start(), block_range.end())
134 }
135
136 pub fn filename_with_configuration(
138 &self,
139 compression: Compression,
140 block_range: &SegmentRangeInclusive,
141 ) -> String {
142 let prefix = self.filename(block_range);
143
144 let filters_name = "none";
145
146 format!("{prefix}_{}_{}", filters_name, compression.as_ref())
149 }
150
151 pub fn parse_filename(name: &str) -> Option<(Self, SegmentRangeInclusive)> {
168 let mut parts = name.split('_');
169 if !(parts.next() == Some("static") && parts.next() == Some("file")) {
170 return None
171 }
172
173 let segment = Self::from_str(parts.next()?).ok()?;
174 let (block_start, block_end) = (parts.next()?.parse().ok()?, parts.next()?.parse().ok()?);
175
176 if block_start > block_end {
177 return None
178 }
179
180 Some((segment, SegmentRangeInclusive::new(block_start, block_end)))
181 }
182
183 pub const fn is_tx_based(&self) -> bool {
185 match self {
186 Self::Receipts | Self::Transactions | Self::TransactionSenders => true,
187 Self::Headers | Self::AccountChangeSets | Self::StorageChangeSets => false,
188 }
189 }
190
191 pub const fn is_change_based(&self) -> bool {
193 match self {
194 Self::AccountChangeSets | Self::StorageChangeSets => true,
195 Self::Receipts | Self::Transactions | Self::Headers | Self::TransactionSenders => false,
196 }
197 }
198
199 pub const fn is_block_based(&self) -> bool {
201 match self {
202 Self::Headers => true,
203 Self::Receipts |
204 Self::Transactions |
205 Self::TransactionSenders |
206 Self::AccountChangeSets |
207 Self::StorageChangeSets => false,
208 }
209 }
210
211 pub const fn is_block_or_change_based(&self) -> bool {
214 self.is_block_based() || self.is_change_based()
215 }
216
217 pub const fn to_stage_id(&self) -> StageId {
219 match self {
220 Self::Headers => StageId::Headers,
221 Self::Transactions => StageId::Bodies,
222 Self::Receipts | Self::AccountChangeSets | Self::StorageChangeSets => {
223 StageId::Execution
224 }
225 Self::TransactionSenders => StageId::SenderRecovery,
226 }
227 }
228}
229
230#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone)]
232pub struct ChangesetOffset {
233 offset: u64,
235
236 num_changes: u64,
238}
239
240impl ChangesetOffset {
241 pub const fn new(offset: u64, num_changes: u64) -> Self {
243 Self { offset, num_changes }
244 }
245
246 pub const fn offset(&self) -> u64 {
248 self.offset
249 }
250
251 pub const fn num_changes(&self) -> u64 {
253 self.num_changes
254 }
255
256 pub const fn changeset_range(&self) -> Range<u64> {
258 self.offset..(self.offset + self.num_changes)
259 }
260
261 pub const fn increment_num_changes(&mut self) {
263 self.num_changes += 1;
264 }
265}
266
267#[derive(Debug, Eq, PartialEq, Hash, Clone)]
269pub struct SegmentHeader {
270 expected_block_range: SegmentRangeInclusive,
275 block_range: Option<SegmentRangeInclusive>,
277 tx_range: Option<SegmentRangeInclusive>,
279 segment: StaticFileSegment,
281 changeset_offsets_len: u64,
283}
284
285struct SegmentHeaderVisitor;
286
287impl<'de> Visitor<'de> for SegmentHeaderVisitor {
288 type Value = SegmentHeader;
289
290 fn expecting(&self, formatter: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
291 formatter.write_str("a header struct with 4 or 5 fields")
292 }
293
294 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
295 where
296 A: serde::de::SeqAccess<'de>,
297 {
298 let expected_block_range =
300 seq.next_element()?.ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
301
302 let block_range =
303 seq.next_element()?.ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
304
305 let tx_range =
306 seq.next_element()?.ok_or_else(|| serde::de::Error::invalid_length(2, &self))?;
307
308 let segment: StaticFileSegment =
309 seq.next_element()?.ok_or_else(|| serde::de::Error::invalid_length(3, &self))?;
310
311 let changeset_offsets_len = if segment.is_change_based() {
312 match seq.next_element::<u64>()? {
314 Some(len) => len,
315 None => {
316 return Err(serde::de::Error::custom(
317 "changeset_offsets_len should exist for changeset static files",
318 ))
319 }
320 }
321 } else {
322 0
323 };
324
325 Ok(SegmentHeader {
326 expected_block_range,
327 block_range,
328 tx_range,
329 segment,
330 changeset_offsets_len,
331 })
332 }
333}
334
335impl<'de> Deserialize<'de> for SegmentHeader {
336 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
337 where
338 D: Deserializer<'de>,
339 {
340 const FIELDS: &[&str] =
344 &["expected_block_range", "block_range", "tx_range", "segment", "changeset_offsets"];
345
346 deserializer.deserialize_struct("SegmentHeader", FIELDS, SegmentHeaderVisitor)
347 }
348}
349
350impl Serialize for SegmentHeader {
351 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
352 where
353 S: Serializer,
354 {
355 let len = if self.segment.is_change_based() { 5 } else { 4 };
357
358 let mut state = serializer.serialize_struct("SegmentHeader", len)?;
359 state.serialize_field("expected_block_range", &self.expected_block_range)?;
360 state.serialize_field("block_range", &self.block_range)?;
361 state.serialize_field("tx_range", &self.tx_range)?;
362 state.serialize_field("segment", &self.segment)?;
363
364 if self.segment.is_change_based() {
365 state.serialize_field("changeset_offsets", &self.changeset_offsets_len)?;
366 }
367
368 state.end()
369 }
370}
371
372impl SegmentHeader {
373 pub const fn new(
375 expected_block_range: SegmentRangeInclusive,
376 block_range: Option<SegmentRangeInclusive>,
377 tx_range: Option<SegmentRangeInclusive>,
378 segment: StaticFileSegment,
379 ) -> Self {
380 Self { expected_block_range, block_range, tx_range, segment, changeset_offsets_len: 0 }
381 }
382
383 pub const fn segment(&self) -> StaticFileSegment {
385 self.segment
386 }
387
388 pub const fn expected_block_range(&self) -> SegmentRangeInclusive {
390 self.expected_block_range
391 }
392
393 pub const fn block_range(&self) -> Option<SegmentRangeInclusive> {
395 self.block_range
396 }
397
398 pub const fn tx_range(&self) -> Option<SegmentRangeInclusive> {
400 self.tx_range
401 }
402
403 pub const fn changeset_offsets_len(&self) -> u64 {
406 self.changeset_offsets_len
407 }
408
409 pub const fn set_changeset_offsets_len(&mut self, len: u64) {
411 self.changeset_offsets_len = len;
412 }
413
414 pub const fn increment_changeset_offsets_len(&mut self) {
416 self.changeset_offsets_len += 1;
417 }
418
419 pub const fn expected_block_start(&self) -> BlockNumber {
421 self.expected_block_range.start()
422 }
423
424 pub const fn set_expected_block_start(&mut self, block: BlockNumber) {
432 let blocks_per_file =
433 self.expected_block_range.end() - self.expected_block_range.start() + 1;
434 let file_range = find_fixed_range(block, blocks_per_file);
435 self.expected_block_range = SegmentRangeInclusive::new(block, file_range.end());
436 }
437
438 pub const fn expected_block_end(&self) -> BlockNumber {
440 self.expected_block_range.end()
441 }
442
443 pub fn block_start(&self) -> Option<BlockNumber> {
445 self.block_range.as_ref().map(|b| b.start())
446 }
447
448 pub fn block_end(&self) -> Option<BlockNumber> {
450 self.block_range.as_ref().map(|b| b.end())
451 }
452
453 pub fn tx_start(&self) -> Option<TxNumber> {
455 self.tx_range.as_ref().map(|t| t.start())
456 }
457
458 pub fn tx_end(&self) -> Option<TxNumber> {
460 self.tx_range.as_ref().map(|t| t.end())
461 }
462
463 pub fn tx_len(&self) -> Option<u64> {
465 self.tx_range.as_ref().map(|r| r.len())
466 }
467
468 pub fn block_len(&self) -> Option<u64> {
470 self.block_range.as_ref().map(|r| r.len())
471 }
472
473 pub const fn increment_block(&mut self) -> BlockNumber {
475 let block_num = if let Some(block_range) = &mut self.block_range {
476 block_range.end += 1;
477 block_range.end
478 } else {
479 self.block_range = Some(SegmentRangeInclusive::new(
480 self.expected_block_start(),
481 self.expected_block_start(),
482 ));
483 self.expected_block_start()
484 };
485
486 if self.segment.is_change_based() {
489 self.changeset_offsets_len += 1;
490 }
491
492 block_num
493 }
494
495 pub const fn increment_tx(&mut self) {
497 if self.segment.is_tx_based() {
498 if let Some(tx_range) = &mut self.tx_range {
499 tx_range.end += 1;
500 } else {
501 self.tx_range = Some(SegmentRangeInclusive::new(0, 0));
502 }
503 }
504 }
505
506 pub const fn prune(&mut self, num: u64) {
511 if self.segment.is_block_or_change_based() {
513 if let Some(range) = &mut self.block_range {
514 if num > range.end - range.start {
515 self.block_range = None;
516 if self.segment.is_change_based() {
518 self.changeset_offsets_len = 0;
519 }
520 } else {
521 let old_end = range.end;
522 range.end = range.end.saturating_sub(num);
523
524 if self.segment.is_change_based() {
526 let blocks_to_remove = old_end - range.end;
528 self.changeset_offsets_len =
529 self.changeset_offsets_len.saturating_sub(blocks_to_remove);
530 }
531 }
532 };
533 } else if let Some(range) = &mut self.tx_range {
534 if num > range.end - range.start {
535 self.tx_range = None;
536 } else {
537 range.end = range.end.saturating_sub(num);
538 }
539 }
540 }
541
542 pub const fn set_block_range(&mut self, block_start: BlockNumber, block_end: BlockNumber) {
544 if let Some(block_range) = &mut self.block_range {
545 block_range.start = block_start;
546 block_range.end = block_end;
547 } else {
548 self.block_range = Some(SegmentRangeInclusive::new(block_start, block_end))
549 }
550 }
551
552 pub const fn sync_changeset_offsets(&mut self) {
558 if !self.segment.is_change_based() {
559 return;
560 }
561
562 if let Some(block_range) = &self.block_range {
563 let expected_len = block_range.end - block_range.start + 1;
564 if self.changeset_offsets_len > expected_len {
565 self.changeset_offsets_len = expected_len;
566 }
567 } else {
568 self.changeset_offsets_len = 0;
570 }
571 }
572
573 pub const fn set_tx_range(&mut self, tx_start: TxNumber, tx_end: TxNumber) {
575 if let Some(tx_range) = &mut self.tx_range {
576 tx_range.start = tx_start;
577 tx_range.end = tx_end;
578 } else {
579 self.tx_range = Some(SegmentRangeInclusive::new(tx_start, tx_end))
580 }
581 }
582
583 pub fn start(&self) -> Option<u64> {
585 if self.segment.is_change_based() {
586 return Some(0)
587 }
588
589 if self.segment.is_block_based() {
590 return self.block_start()
591 }
592 self.tx_start()
593 }
594
595 pub fn changeset_offset_index(&self, block: BlockNumber) -> Option<u64> {
601 let block_range = self.block_range()?;
602 if !block_range.contains(block) {
603 return None;
604 }
605
606 let index = block - block_range.start();
607 if index >= self.changeset_offsets_len {
608 return None;
609 }
610
611 Some(index)
612 }
613}
614
615#[derive(Debug, Clone, Copy)]
617pub struct SegmentConfig {
618 pub compression: Compression,
620}
621
622#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)]
626pub struct SegmentRangeInclusive {
627 start: u64,
628 end: u64,
629}
630
631impl SegmentRangeInclusive {
632 pub const fn new(start: u64, end: u64) -> Self {
634 Self { start, end }
635 }
636
637 pub const fn start(&self) -> u64 {
639 self.start
640 }
641
642 pub const fn end(&self) -> u64 {
644 self.end
645 }
646
647 pub const fn len(&self) -> u64 {
649 self.end.saturating_sub(self.start).saturating_add(1)
650 }
651
652 pub const fn is_empty(&self) -> bool {
654 self.start > self.end
655 }
656
657 pub fn contains(&self, number: u64) -> bool {
659 (self.start..=self.end).contains(&number)
660 }
661}
662
663impl core::fmt::Display for SegmentRangeInclusive {
664 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
665 write!(f, "{}..={}", self.start, self.end)
666 }
667}
668
669impl From<RangeInclusive<u64>> for SegmentRangeInclusive {
670 fn from(value: RangeInclusive<u64>) -> Self {
671 Self { start: *value.start(), end: *value.end() }
672 }
673}
674
675impl From<&SegmentRangeInclusive> for RangeInclusive<u64> {
676 fn from(value: &SegmentRangeInclusive) -> Self {
677 value.start()..=value.end()
678 }
679}
680
681impl From<SegmentRangeInclusive> for RangeInclusive<u64> {
682 fn from(value: SegmentRangeInclusive) -> Self {
683 (&value).into()
684 }
685}
686
687#[cfg(test)]
688mod tests {
689 use super::*;
690 use alloy_primitives::Bytes;
691 use reth_nippy_jar::NippyJar;
692 use std::env::temp_dir;
693
694 #[test]
695 fn test_filename() {
696 let test_vectors = [
697 (StaticFileSegment::Headers, 2..=30, "static_file_headers_2_30", None),
698 (StaticFileSegment::Receipts, 30..=300, "static_file_receipts_30_300", None),
699 (
700 StaticFileSegment::Transactions,
701 1_123_233..=11_223_233,
702 "static_file_transactions_1123233_11223233",
703 None,
704 ),
705 (
706 StaticFileSegment::AccountChangeSets,
707 1_123_233..=11_223_233,
708 "static_file_account-change-sets_1123233_11223233",
709 None,
710 ),
711 (
712 StaticFileSegment::StorageChangeSets,
713 1_123_233..=11_223_233,
714 "static_file_storage-change-sets_1123233_11223233",
715 None,
716 ),
717 (
718 StaticFileSegment::Headers,
719 2..=30,
720 "static_file_headers_2_30_none_lz4",
721 Some(Compression::Lz4),
722 ),
723 (
724 StaticFileSegment::Headers,
725 2..=30,
726 "static_file_headers_2_30_none_zstd",
727 Some(Compression::Zstd),
728 ),
729 (
730 StaticFileSegment::Headers,
731 2..=30,
732 "static_file_headers_2_30_none_zstd-dict",
733 Some(Compression::ZstdWithDictionary),
734 ),
735 ];
736
737 for (segment, block_range, filename, compression) in test_vectors {
738 let block_range: SegmentRangeInclusive = block_range.into();
739 if let Some(compression) = compression {
740 assert_eq!(
741 segment.filename_with_configuration(compression, &block_range),
742 filename
743 );
744 } else {
745 assert_eq!(segment.filename(&block_range), filename);
746 }
747
748 assert_eq!(StaticFileSegment::parse_filename(filename), Some((segment, block_range)));
749 }
750
751 assert_eq!(StaticFileSegment::parse_filename("static_file_headers_2"), None);
752 assert_eq!(StaticFileSegment::parse_filename("static_file_headers_"), None);
753
754 let dummy_range = SegmentRangeInclusive::new(123, 1230);
756 for segment in StaticFileSegment::iter() {
757 let filename = segment.filename(&dummy_range);
758 assert_eq!(Some((segment, dummy_range)), StaticFileSegment::parse_filename(&filename));
759 }
760 }
761
762 #[test]
763 fn test_segment_config_serialization() {
764 let segments = vec![
765 SegmentHeader {
766 expected_block_range: SegmentRangeInclusive::new(0, 200),
767 block_range: Some(SegmentRangeInclusive::new(0, 100)),
768 tx_range: None,
769 segment: StaticFileSegment::Headers,
770 changeset_offsets_len: 0,
771 },
772 SegmentHeader {
773 expected_block_range: SegmentRangeInclusive::new(0, 200),
774 block_range: None,
775 tx_range: Some(SegmentRangeInclusive::new(0, 300)),
776 segment: StaticFileSegment::Transactions,
777 changeset_offsets_len: 0,
778 },
779 SegmentHeader {
780 expected_block_range: SegmentRangeInclusive::new(0, 200),
781 block_range: Some(SegmentRangeInclusive::new(0, 100)),
782 tx_range: Some(SegmentRangeInclusive::new(0, 300)),
783 segment: StaticFileSegment::Receipts,
784 changeset_offsets_len: 0,
785 },
786 SegmentHeader {
787 expected_block_range: SegmentRangeInclusive::new(0, 200),
788 block_range: Some(SegmentRangeInclusive::new(0, 100)),
789 tx_range: Some(SegmentRangeInclusive::new(0, 300)),
790 segment: StaticFileSegment::TransactionSenders,
791 changeset_offsets_len: 0,
792 },
793 SegmentHeader {
794 expected_block_range: SegmentRangeInclusive::new(0, 200),
795 block_range: Some(SegmentRangeInclusive::new(0, 100)),
796 tx_range: Some(SegmentRangeInclusive::new(0, 300)),
797 segment: StaticFileSegment::AccountChangeSets,
798 changeset_offsets_len: 100,
799 },
800 SegmentHeader {
801 expected_block_range: SegmentRangeInclusive::new(0, 200),
802 block_range: Some(SegmentRangeInclusive::new(0, 100)),
803 tx_range: None,
804 segment: StaticFileSegment::StorageChangeSets,
805 changeset_offsets_len: 100,
806 },
807 ];
808 assert_eq!(
810 segments.iter().map(|segment| segment.segment()).collect::<Vec<_>>(),
811 StaticFileSegment::iter().collect::<Vec<_>>()
812 );
813
814 for header in segments {
815 let segment_jar = NippyJar::new(1, &temp_dir(), header.clone());
816 let mut serialized = Vec::new();
817 segment_jar.save_to_writer(&mut serialized).unwrap();
818
819 let deserialized =
820 NippyJar::<SegmentHeader>::load_from_reader(&serialized[..]).unwrap();
821 assert_eq!(deserialized.user_header(), segment_jar.user_header());
822
823 insta::assert_snapshot!(header.segment().to_string(), Bytes::from(serialized));
824 }
825 }
826
827 #[test]
829 fn test_static_file_segment_str_roundtrip() {
830 for segment in StaticFileSegment::iter() {
831 let static_str = segment.as_str();
832 assert_eq!(StaticFileSegment::from_str(static_str).unwrap(), segment);
833
834 let expected_str = match segment {
835 StaticFileSegment::Headers => "headers",
836 StaticFileSegment::Transactions => "transactions",
837 StaticFileSegment::Receipts => "receipts",
838 StaticFileSegment::TransactionSenders => "transaction-senders",
839 StaticFileSegment::AccountChangeSets => "account-change-sets",
840 StaticFileSegment::StorageChangeSets => "storage-change-sets",
841 };
842 assert_eq!(static_str, expected_str);
843 }
844 }
845
846 #[test]
848 fn test_static_file_segment_serde_roundtrip() {
849 for segment in StaticFileSegment::iter() {
850 let ser = serde_json::to_string(&segment).unwrap();
851 assert_eq!(serde_json::from_str::<StaticFileSegment>(&ser).unwrap(), segment);
852
853 let expected_str = match segment {
854 StaticFileSegment::Headers => "Headers",
855 StaticFileSegment::Transactions => "Transactions",
856 StaticFileSegment::Receipts => "Receipts",
857 StaticFileSegment::TransactionSenders => "TransactionSenders",
858 StaticFileSegment::AccountChangeSets => "AccountChangeSets",
859 StaticFileSegment::StorageChangeSets => "StorageChangeSets",
860 };
861 assert_eq!(ser, format!("\"{expected_str}\""));
862 }
863 }
864
865 #[test]
866 fn test_static_file_segment_as_short_str() {
867 for segment in StaticFileSegment::iter() {
868 assert!(
869 segment.as_short_str().len() <= 15,
870 "{segment} segment name is too long: {}",
871 segment.as_short_str()
872 );
873 }
874 }
875}