1use crate::{BlockNumber, Compression};
2use alloc::{format, string::String, vec::Vec};
3use alloy_primitives::TxNumber;
4use core::{
5 ops::{Range, RangeInclusive},
6 str::FromStr,
7};
8use serde::{de::Visitor, ser::SerializeStruct, Deserialize, Deserializer, Serialize, Serializer};
9use strum::{EnumIs, EnumString};
10
11#[derive(
12 Debug,
13 Copy,
14 Clone,
15 Eq,
16 PartialEq,
17 Hash,
18 Ord,
19 PartialOrd,
20 EnumString,
21 derive_more::Display,
22 EnumIs,
23 Serialize,
24 Deserialize,
25 fixed_map::Key,
26)]
27#[strum(serialize_all = "kebab-case")]
28#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
29pub enum StaticFileSegment {
31 Headers,
34 Transactions,
36 Receipts,
38 TransactionSenders,
40 AccountChangeSets,
58 StorageChangeSets,
63}
64
65impl StaticFileSegment {
66 pub const fn as_str(&self) -> &'static str {
68 match self {
74 Self::Headers => "headers",
75 Self::Transactions => "transactions",
76 Self::Receipts => "receipts",
77 Self::TransactionSenders => "transaction-senders",
78 Self::AccountChangeSets => "account-change-sets",
79 Self::StorageChangeSets => "storage-change-sets",
80 }
81 }
82
83 pub fn iter() -> impl Iterator<Item = Self> {
85 [
87 Self::Headers,
88 Self::Transactions,
89 Self::Receipts,
90 Self::TransactionSenders,
91 Self::AccountChangeSets,
92 Self::StorageChangeSets,
93 ]
94 .into_iter()
95 }
96
97 pub const fn config(&self) -> SegmentConfig {
99 SegmentConfig { compression: Compression::Lz4 }
100 }
101
102 pub const fn columns(&self) -> usize {
104 match self {
105 Self::Headers => 3,
106 Self::Transactions |
107 Self::Receipts |
108 Self::TransactionSenders |
109 Self::AccountChangeSets |
110 Self::StorageChangeSets => 1,
111 }
112 }
113
114 pub fn filename(&self, block_range: &SegmentRangeInclusive) -> String {
116 format!("static_file_{}_{}_{}", self.as_str(), block_range.start(), block_range.end())
119 }
120
121 pub fn filename_with_configuration(
123 &self,
124 compression: Compression,
125 block_range: &SegmentRangeInclusive,
126 ) -> String {
127 let prefix = self.filename(block_range);
128
129 let filters_name = "none";
130
131 format!("{prefix}_{}_{}", filters_name, compression.as_ref())
134 }
135
136 pub fn parse_filename(name: &str) -> Option<(Self, SegmentRangeInclusive)> {
153 let mut parts = name.split('_');
154 if !(parts.next() == Some("static") && parts.next() == Some("file")) {
155 return None
156 }
157
158 let segment = Self::from_str(parts.next()?).ok()?;
159 let (block_start, block_end) = (parts.next()?.parse().ok()?, parts.next()?.parse().ok()?);
160
161 if block_start > block_end {
162 return None
163 }
164
165 Some((segment, SegmentRangeInclusive::new(block_start, block_end)))
166 }
167
168 pub const fn is_tx_based(&self) -> bool {
170 match self {
171 Self::Receipts | Self::Transactions | Self::TransactionSenders => true,
172 Self::Headers | Self::AccountChangeSets | Self::StorageChangeSets => false,
173 }
174 }
175
176 pub const fn is_change_based(&self) -> bool {
178 match self {
179 Self::AccountChangeSets | Self::StorageChangeSets => true,
180 Self::Receipts | Self::Transactions | Self::Headers | Self::TransactionSenders => false,
181 }
182 }
183
184 pub const fn is_block_based(&self) -> bool {
186 match self {
187 Self::Headers => true,
188 Self::Receipts |
189 Self::Transactions |
190 Self::TransactionSenders |
191 Self::AccountChangeSets |
192 Self::StorageChangeSets => false,
193 }
194 }
195
196 pub const fn is_block_or_change_based(&self) -> bool {
199 self.is_block_based() || self.is_change_based()
200 }
201}
202
203#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone)]
205pub struct ChangesetOffset {
206 offset: u64,
208
209 num_changes: u64,
211}
212
213impl ChangesetOffset {
214 pub const fn offset(&self) -> u64 {
216 self.offset
217 }
218
219 pub const fn num_changes(&self) -> u64 {
221 self.num_changes
222 }
223
224 pub const fn changeset_range(&self) -> Range<u64> {
226 self.offset..(self.offset + self.num_changes)
227 }
228}
229
230#[derive(Debug, Eq, PartialEq, Hash, Clone)]
232pub struct SegmentHeader {
233 expected_block_range: SegmentRangeInclusive,
238 block_range: Option<SegmentRangeInclusive>,
240 tx_range: Option<SegmentRangeInclusive>,
242 segment: StaticFileSegment,
244 changeset_offsets: Option<Vec<ChangesetOffset>>,
246}
247
248struct SegmentHeaderVisitor;
249
250impl<'de> Visitor<'de> for SegmentHeaderVisitor {
251 type Value = SegmentHeader;
252
253 fn expecting(&self, formatter: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
254 formatter.write_str("a header struct with 4 or 5 fields")
255 }
256
257 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
258 where
259 A: serde::de::SeqAccess<'de>,
260 {
261 let expected_block_range =
263 seq.next_element()?.ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
264
265 let block_range =
266 seq.next_element()?.ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
267
268 let tx_range =
269 seq.next_element()?.ok_or_else(|| serde::de::Error::invalid_length(2, &self))?;
270
271 let segment: StaticFileSegment =
272 seq.next_element()?.ok_or_else(|| serde::de::Error::invalid_length(3, &self))?;
273
274 let changeset_offsets = if segment.is_change_based() {
275 match seq.next_element()? {
278 Some(Some(offsets)) => Some(offsets),
279 Some(None) => None,
281 None => {
282 return Err(serde::de::Error::custom(
283 "changeset_offsets should exist for static files",
284 ))
285 }
286 }
287 } else {
288 None
289 };
290
291 Ok(SegmentHeader {
292 expected_block_range,
293 block_range,
294 tx_range,
295 segment,
296 changeset_offsets,
297 })
298 }
299}
300
301impl<'de> Deserialize<'de> for SegmentHeader {
302 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
303 where
304 D: Deserializer<'de>,
305 {
306 const FIELDS: &[&str] =
310 &["expected_block_range", "block_range", "tx_range", "segment", "changeset_offsets"];
311
312 deserializer.deserialize_struct("SegmentHeader", FIELDS, SegmentHeaderVisitor)
313 }
314}
315
316impl Serialize for SegmentHeader {
317 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
318 where
319 S: Serializer,
320 {
321 let len = if self.segment.is_change_based() { 5 } else { 4 };
323
324 let mut state = serializer.serialize_struct("SegmentHeader", len)?;
325 state.serialize_field("expected_block_range", &self.expected_block_range)?;
326 state.serialize_field("block_range", &self.block_range)?;
327 state.serialize_field("tx_range", &self.tx_range)?;
328 state.serialize_field("segment", &self.segment)?;
329
330 if self.segment.is_change_based() {
331 state.serialize_field("changeset_offsets", &self.changeset_offsets)?;
332 }
333
334 state.end()
335 }
336}
337
338impl SegmentHeader {
339 pub const fn new(
341 expected_block_range: SegmentRangeInclusive,
342 block_range: Option<SegmentRangeInclusive>,
343 tx_range: Option<SegmentRangeInclusive>,
344 segment: StaticFileSegment,
345 ) -> Self {
346 Self { expected_block_range, block_range, tx_range, segment, changeset_offsets: None }
347 }
348
349 pub const fn segment(&self) -> StaticFileSegment {
351 self.segment
352 }
353
354 pub const fn expected_block_range(&self) -> SegmentRangeInclusive {
356 self.expected_block_range
357 }
358
359 pub const fn block_range(&self) -> Option<SegmentRangeInclusive> {
361 self.block_range
362 }
363
364 pub const fn tx_range(&self) -> Option<SegmentRangeInclusive> {
366 self.tx_range
367 }
368
369 pub const fn changeset_offsets(&self) -> Option<&Vec<ChangesetOffset>> {
371 self.changeset_offsets.as_ref()
372 }
373
374 pub const fn expected_block_start(&self) -> BlockNumber {
376 self.expected_block_range.start()
377 }
378
379 pub const fn expected_block_end(&self) -> BlockNumber {
381 self.expected_block_range.end()
382 }
383
384 pub fn block_start(&self) -> Option<BlockNumber> {
386 self.block_range.as_ref().map(|b| b.start())
387 }
388
389 pub fn block_end(&self) -> Option<BlockNumber> {
391 self.block_range.as_ref().map(|b| b.end())
392 }
393
394 pub fn tx_start(&self) -> Option<TxNumber> {
396 self.tx_range.as_ref().map(|t| t.start())
397 }
398
399 pub fn tx_end(&self) -> Option<TxNumber> {
401 self.tx_range.as_ref().map(|t| t.end())
402 }
403
404 pub fn tx_len(&self) -> Option<u64> {
406 self.tx_range.as_ref().map(|r| r.len())
407 }
408
409 pub fn block_len(&self) -> Option<u64> {
411 self.block_range.as_ref().map(|r| r.len())
412 }
413
414 pub fn increment_block(&mut self) -> BlockNumber {
416 let block_num = if let Some(block_range) = &mut self.block_range {
417 block_range.end += 1;
418 block_range.end
419 } else {
420 self.block_range = Some(SegmentRangeInclusive::new(
421 self.expected_block_start(),
422 self.expected_block_start(),
423 ));
424 self.expected_block_start()
425 };
426
427 if self.segment.is_change_based() {
429 let offsets = self.changeset_offsets.get_or_insert_default();
430 let new_offset = if let Some(last_offset) = offsets.last() {
432 last_offset.offset + last_offset.num_changes
434 } else {
435 0
437 };
438
439 offsets.push(ChangesetOffset { offset: new_offset, num_changes: 0 });
441 }
442
443 block_num
444 }
445
446 pub const fn increment_tx(&mut self) {
448 if self.segment.is_tx_based() {
449 if let Some(tx_range) = &mut self.tx_range {
450 tx_range.end += 1;
451 } else {
452 self.tx_range = Some(SegmentRangeInclusive::new(0, 0));
453 }
454 }
455 }
456
457 pub fn increment_block_changes(&mut self) {
459 debug_assert!(self.segment().is_change_based());
460 if self.segment.is_change_based() {
461 let offsets = self.changeset_offsets.get_or_insert_with(Default::default);
462 if let Some(last_offset) = offsets.last_mut() {
463 last_offset.num_changes += 1;
464 } else {
465 offsets.push(ChangesetOffset { offset: 0, num_changes: 1 });
468 }
469 }
470 }
471
472 pub fn prune(&mut self, num: u64) {
474 if self.segment.is_block_or_change_based() {
476 if let Some(range) = &mut self.block_range {
477 if num > range.end - range.start {
478 self.block_range = None;
479 if self.segment.is_change_based() {
481 self.changeset_offsets = None;
482 }
483 } else {
484 let old_end = range.end;
485 range.end = range.end.saturating_sub(num);
486
487 if self.segment.is_change_based() &&
489 let Some(offsets) = &mut self.changeset_offsets
490 {
491 let blocks_to_remove = old_end - range.end;
493 let new_len = offsets.len().saturating_sub(blocks_to_remove as usize);
495 offsets.truncate(new_len);
496
497 if offsets.is_empty() {
499 self.changeset_offsets = None;
500 }
501 }
502 }
503 };
504 } else if let Some(range) = &mut self.tx_range {
505 if num > range.end - range.start {
506 self.tx_range = None;
507 } else {
508 range.end = range.end.saturating_sub(num);
509 }
510 }
511 }
512
513 pub const fn set_block_range(&mut self, block_start: BlockNumber, block_end: BlockNumber) {
515 if let Some(block_range) = &mut self.block_range {
516 block_range.start = block_start;
517 block_range.end = block_end;
518 } else {
519 self.block_range = Some(SegmentRangeInclusive::new(block_start, block_end))
520 }
521 }
522
523 pub fn sync_changeset_offsets(&mut self) {
528 if !self.segment.is_change_based() {
529 return;
530 }
531
532 if let Some(block_range) = &self.block_range {
533 if let Some(offsets) = &mut self.changeset_offsets {
534 let expected_len = (block_range.end - block_range.start + 1) as usize;
535 if offsets.len() > expected_len {
536 offsets.truncate(expected_len);
537 if offsets.is_empty() {
538 self.changeset_offsets = None;
539 }
540 }
541 }
542 } else {
543 self.changeset_offsets = None;
545 }
546 }
547
548 pub const fn set_tx_range(&mut self, tx_start: TxNumber, tx_end: TxNumber) {
550 if let Some(tx_range) = &mut self.tx_range {
551 tx_range.start = tx_start;
552 tx_range.end = tx_end;
553 } else {
554 self.tx_range = Some(SegmentRangeInclusive::new(tx_start, tx_end))
555 }
556 }
557
558 pub fn start(&self) -> Option<u64> {
560 if self.segment.is_change_based() {
561 return Some(0)
562 }
563
564 if self.segment.is_block_based() {
565 return self.block_start()
566 }
567 self.tx_start()
568 }
569
570 pub fn changeset_offset(&self, block: BlockNumber) -> Option<&ChangesetOffset> {
576 let block_range = self.block_range()?;
577 if !block_range.contains(block) {
578 return None
579 }
580
581 let offsets = self.changeset_offsets.as_ref()?;
582 let index = (block - block_range.start()) as usize;
583
584 offsets.get(index)
585 }
586}
587
588#[derive(Debug, Clone, Copy)]
590pub struct SegmentConfig {
591 pub compression: Compression,
593}
594
595#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)]
599pub struct SegmentRangeInclusive {
600 start: u64,
601 end: u64,
602}
603
604impl SegmentRangeInclusive {
605 pub const fn new(start: u64, end: u64) -> Self {
607 Self { start, end }
608 }
609
610 pub const fn start(&self) -> u64 {
612 self.start
613 }
614
615 pub const fn end(&self) -> u64 {
617 self.end
618 }
619
620 pub const fn len(&self) -> u64 {
622 self.end.saturating_sub(self.start).saturating_add(1)
623 }
624
625 pub const fn is_empty(&self) -> bool {
627 self.start > self.end
628 }
629
630 pub fn contains(&self, number: u64) -> bool {
632 (self.start..=self.end).contains(&number)
633 }
634}
635
636impl core::fmt::Display for SegmentRangeInclusive {
637 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
638 write!(f, "{}..={}", self.start, self.end)
639 }
640}
641
642impl From<RangeInclusive<u64>> for SegmentRangeInclusive {
643 fn from(value: RangeInclusive<u64>) -> Self {
644 Self { start: *value.start(), end: *value.end() }
645 }
646}
647
648impl From<&SegmentRangeInclusive> for RangeInclusive<u64> {
649 fn from(value: &SegmentRangeInclusive) -> Self {
650 value.start()..=value.end()
651 }
652}
653
654impl From<SegmentRangeInclusive> for RangeInclusive<u64> {
655 fn from(value: SegmentRangeInclusive) -> Self {
656 (&value).into()
657 }
658}
659
660#[cfg(test)]
661mod tests {
662 use super::*;
663 use alloy_primitives::Bytes;
664 use reth_nippy_jar::NippyJar;
665 use std::env::temp_dir;
666
667 #[test]
668 fn test_filename() {
669 let test_vectors = [
670 (StaticFileSegment::Headers, 2..=30, "static_file_headers_2_30", None),
671 (StaticFileSegment::Receipts, 30..=300, "static_file_receipts_30_300", None),
672 (
673 StaticFileSegment::Transactions,
674 1_123_233..=11_223_233,
675 "static_file_transactions_1123233_11223233",
676 None,
677 ),
678 (
679 StaticFileSegment::AccountChangeSets,
680 1_123_233..=11_223_233,
681 "static_file_account-change-sets_1123233_11223233",
682 None,
683 ),
684 (
685 StaticFileSegment::StorageChangeSets,
686 1_123_233..=11_223_233,
687 "static_file_storage-change-sets_1123233_11223233",
688 None,
689 ),
690 (
691 StaticFileSegment::Headers,
692 2..=30,
693 "static_file_headers_2_30_none_lz4",
694 Some(Compression::Lz4),
695 ),
696 (
697 StaticFileSegment::Headers,
698 2..=30,
699 "static_file_headers_2_30_none_zstd",
700 Some(Compression::Zstd),
701 ),
702 (
703 StaticFileSegment::Headers,
704 2..=30,
705 "static_file_headers_2_30_none_zstd-dict",
706 Some(Compression::ZstdWithDictionary),
707 ),
708 ];
709
710 for (segment, block_range, filename, compression) in test_vectors {
711 let block_range: SegmentRangeInclusive = block_range.into();
712 if let Some(compression) = compression {
713 assert_eq!(
714 segment.filename_with_configuration(compression, &block_range),
715 filename
716 );
717 } else {
718 assert_eq!(segment.filename(&block_range), filename);
719 }
720
721 assert_eq!(StaticFileSegment::parse_filename(filename), Some((segment, block_range)));
722 }
723
724 assert_eq!(StaticFileSegment::parse_filename("static_file_headers_2"), None);
725 assert_eq!(StaticFileSegment::parse_filename("static_file_headers_"), None);
726
727 let dummy_range = SegmentRangeInclusive::new(123, 1230);
729 for segment in StaticFileSegment::iter() {
730 let filename = segment.filename(&dummy_range);
731 assert_eq!(Some((segment, dummy_range)), StaticFileSegment::parse_filename(&filename));
732 }
733 }
734
735 #[test]
736 fn test_segment_config_serialization() {
737 let segments = vec![
738 SegmentHeader {
739 expected_block_range: SegmentRangeInclusive::new(0, 200),
740 block_range: Some(SegmentRangeInclusive::new(0, 100)),
741 tx_range: None,
742 segment: StaticFileSegment::Headers,
743 changeset_offsets: None,
744 },
745 SegmentHeader {
746 expected_block_range: SegmentRangeInclusive::new(0, 200),
747 block_range: None,
748 tx_range: Some(SegmentRangeInclusive::new(0, 300)),
749 segment: StaticFileSegment::Transactions,
750 changeset_offsets: None,
751 },
752 SegmentHeader {
753 expected_block_range: SegmentRangeInclusive::new(0, 200),
754 block_range: Some(SegmentRangeInclusive::new(0, 100)),
755 tx_range: Some(SegmentRangeInclusive::new(0, 300)),
756 segment: StaticFileSegment::Receipts,
757 changeset_offsets: None,
758 },
759 SegmentHeader {
760 expected_block_range: SegmentRangeInclusive::new(0, 200),
761 block_range: Some(SegmentRangeInclusive::new(0, 100)),
762 tx_range: Some(SegmentRangeInclusive::new(0, 300)),
763 segment: StaticFileSegment::TransactionSenders,
764 changeset_offsets: None,
765 },
766 SegmentHeader {
767 expected_block_range: SegmentRangeInclusive::new(0, 200),
768 block_range: Some(SegmentRangeInclusive::new(0, 100)),
769 tx_range: Some(SegmentRangeInclusive::new(0, 300)),
770 segment: StaticFileSegment::AccountChangeSets,
771 changeset_offsets: Some(vec![ChangesetOffset { offset: 1, num_changes: 1 }; 100]),
772 },
773 SegmentHeader {
774 expected_block_range: SegmentRangeInclusive::new(0, 200),
775 block_range: Some(SegmentRangeInclusive::new(0, 100)),
776 tx_range: None,
777 segment: StaticFileSegment::StorageChangeSets,
778 changeset_offsets: Some(vec![ChangesetOffset { offset: 1, num_changes: 1 }; 100]),
779 },
780 ];
781 assert_eq!(
783 segments.iter().map(|segment| segment.segment()).collect::<Vec<_>>(),
784 StaticFileSegment::iter().collect::<Vec<_>>()
785 );
786
787 for header in segments {
788 let segment_jar = NippyJar::new(1, &temp_dir(), header.clone());
789 let mut serialized = Vec::new();
790 segment_jar.save_to_writer(&mut serialized).unwrap();
791
792 let deserialized =
793 NippyJar::<SegmentHeader>::load_from_reader(&serialized[..]).unwrap();
794 assert_eq!(deserialized.user_header(), segment_jar.user_header());
795
796 insta::assert_snapshot!(header.segment().to_string(), Bytes::from(serialized));
797 }
798 }
799
800 #[test]
802 fn test_static_file_segment_str_roundtrip() {
803 for segment in StaticFileSegment::iter() {
804 let static_str = segment.as_str();
805 assert_eq!(StaticFileSegment::from_str(static_str).unwrap(), segment);
806
807 let expected_str = match segment {
808 StaticFileSegment::Headers => "headers",
809 StaticFileSegment::Transactions => "transactions",
810 StaticFileSegment::Receipts => "receipts",
811 StaticFileSegment::TransactionSenders => "transaction-senders",
812 StaticFileSegment::AccountChangeSets => "account-change-sets",
813 StaticFileSegment::StorageChangeSets => "storage-change-sets",
814 };
815 assert_eq!(static_str, expected_str);
816 }
817 }
818
819 #[test]
821 fn test_static_file_segment_serde_roundtrip() {
822 for segment in StaticFileSegment::iter() {
823 let ser = serde_json::to_string(&segment).unwrap();
824 assert_eq!(serde_json::from_str::<StaticFileSegment>(&ser).unwrap(), segment);
825
826 let expected_str = match segment {
827 StaticFileSegment::Headers => "Headers",
828 StaticFileSegment::Transactions => "Transactions",
829 StaticFileSegment::Receipts => "Receipts",
830 StaticFileSegment::TransactionSenders => "TransactionSenders",
831 StaticFileSegment::AccountChangeSets => "AccountChangeSets",
832 StaticFileSegment::StorageChangeSets => "StorageChangeSets",
833 };
834 assert_eq!(ser, format!("\"{expected_str}\""));
835 }
836 }
837}