Skip to main content

reth_provider/providers/static_file/
jar.rs

1use super::{
2    metrics::{StaticFileProviderMetrics, StaticFileProviderOperation},
3    LoadedJarRef,
4};
5use crate::{
6    to_range, BlockHashReader, BlockNumReader, HeaderProvider, ReceiptProvider,
7    TransactionsProvider,
8};
9use alloy_consensus::transaction::TransactionMeta;
10use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
11use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
12use reth_chainspec::ChainInfo;
13use reth_db::static_file::{
14    BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask, StaticFileCursor, TransactionMask,
15    TransactionSenderMask,
16};
17use reth_db_api::table::{Decompress, Value};
18use reth_node_types::NodePrimitives;
19use reth_primitives_traits::{SealedHeader, SignedTransaction};
20use reth_static_file_types::{ChangesetOffset, ChangesetOffsetReader};
21use reth_storage_api::range_size_hint;
22use reth_storage_errors::provider::{ProviderError, ProviderResult};
23use std::{
24    fmt::Debug,
25    ops::{Deref, RangeBounds, RangeInclusive},
26    sync::Arc,
27};
28/// Provider over a specific `NippyJar` and range.
29#[derive(Debug)]
30pub struct StaticFileJarProvider<'a, N> {
31    /// Main static file segment
32    jar: LoadedJarRef<'a>,
33    /// Another kind of static file segment to help query data from the main one.
34    auxiliary_jar: Option<Box<Self>>,
35    /// Metrics for the static files.
36    metrics: Option<Arc<StaticFileProviderMetrics>>,
37    /// Node primitives
38    _pd: std::marker::PhantomData<N>,
39}
40
41impl<'a, N: NodePrimitives> Deref for StaticFileJarProvider<'a, N> {
42    type Target = LoadedJarRef<'a>;
43    fn deref(&self) -> &Self::Target {
44        &self.jar
45    }
46}
47
48impl<'a, N: NodePrimitives> From<LoadedJarRef<'a>> for StaticFileJarProvider<'a, N> {
49    fn from(value: LoadedJarRef<'a>) -> Self {
50        StaticFileJarProvider {
51            jar: value,
52            auxiliary_jar: None,
53            metrics: None,
54            _pd: Default::default(),
55        }
56    }
57}
58
59impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> {
60    /// Provides a cursor for more granular data access.
61    pub fn cursor<'b>(&'b self) -> ProviderResult<StaticFileCursor<'a>>
62    where
63        'b: 'a,
64    {
65        let result = StaticFileCursor::new(self.value(), self.mmap_handle())?;
66
67        if let Some(metrics) = &self.metrics {
68            metrics.record_segment_operation(
69                self.segment(),
70                StaticFileProviderOperation::InitCursor,
71                None,
72            );
73        }
74
75        Ok(result)
76    }
77
78    /// Adds a new auxiliary static file to help query data from the main one
79    pub fn with_auxiliary(mut self, auxiliary_jar: Self) -> Self {
80        self.auxiliary_jar = Some(Box::new(auxiliary_jar));
81        self
82    }
83
84    /// Enables metrics on the provider.
85    pub fn with_metrics(mut self, metrics: Arc<StaticFileProviderMetrics>) -> Self {
86        self.metrics = Some(metrics);
87        self
88    }
89
90    /// Returns the total size of the data and offsets files (from the in-memory mmap).
91    pub fn size(&self) -> usize {
92        self.jar.value().size()
93    }
94
95    /// Reads a changeset offset from the sidecar file for a given block.
96    ///
97    /// Returns `None` if:
98    /// - The segment is not change-based
99    /// - The block is not in the block range
100    /// - The sidecar file doesn't exist
101    pub fn read_changeset_offset(
102        &self,
103        block_number: BlockNumber,
104    ) -> ProviderResult<Option<ChangesetOffset>> {
105        let header = self.user_header();
106        if !header.segment().is_change_based() {
107            return Ok(None);
108        }
109
110        let Some(index) = header.changeset_offset_index(block_number) else {
111            return Ok(None);
112        };
113
114        let csoff_path = self.data_path().with_extension("csoff");
115        if !csoff_path.exists() {
116            return Ok(None);
117        }
118
119        let len = header.changeset_offsets_len();
120        let mut reader =
121            ChangesetOffsetReader::new(&csoff_path, len).map_err(ProviderError::other)?;
122        reader.get(index).map_err(ProviderError::other)
123    }
124
125    /// Reads all changeset offsets from the sidecar file.
126    ///
127    /// Returns `None` if:
128    /// - The segment is not change-based
129    /// - The sidecar file doesn't exist
130    pub fn read_changeset_offsets(&self) -> ProviderResult<Option<Vec<ChangesetOffset>>> {
131        let header = self.user_header();
132        if !header.segment().is_change_based() {
133            return Ok(None);
134        }
135
136        let len = header.changeset_offsets_len();
137        if len == 0 {
138            return Ok(Some(Vec::new()));
139        }
140
141        let csoff_path = self.data_path().with_extension("csoff");
142        if !csoff_path.exists() {
143            return Ok(None);
144        }
145
146        let mut reader =
147            ChangesetOffsetReader::new(&csoff_path, len).map_err(ProviderError::other)?;
148        let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
149        Ok(Some(offsets))
150    }
151}
152
153impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileJarProvider<'_, N> {
154    type Header = N::BlockHeader;
155
156    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
157        Ok(self
158            .cursor()?
159            .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
160            .filter(|(_, hash)| hash == &block_hash)
161            .map(|(header, _)| header))
162    }
163
164    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
165        self.cursor()?.get_one::<HeaderMask<Self::Header>>(num.into())
166    }
167
168    fn headers_range(
169        &self,
170        range: impl RangeBounds<BlockNumber>,
171    ) -> ProviderResult<Vec<Self::Header>> {
172        let mut cursor = self.cursor()?;
173        let mut headers = Vec::with_capacity(range_size_hint(&range).unwrap_or(1024));
174
175        for num in to_range(range) {
176            if let Some(header) = cursor.get_one::<HeaderMask<Self::Header>>(num.into())? {
177                headers.push(header);
178            }
179        }
180
181        Ok(headers)
182    }
183
184    fn sealed_header(
185        &self,
186        number: BlockNumber,
187    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
188        Ok(self
189            .cursor()?
190            .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
191            .map(|(header, hash)| SealedHeader::new(header, hash)))
192    }
193
194    fn sealed_headers_while(
195        &self,
196        range: impl RangeBounds<BlockNumber>,
197        mut predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
198    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
199        let mut cursor = self.cursor()?;
200        let mut headers = Vec::with_capacity(range_size_hint(&range).unwrap_or(1024));
201
202        for number in to_range(range) {
203            if let Some((header, hash)) =
204                cursor.get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
205            {
206                let sealed = SealedHeader::new(header, hash);
207                if !predicate(&sealed) {
208                    break
209                }
210                headers.push(sealed);
211            }
212        }
213        Ok(headers)
214    }
215}
216
217impl<N: NodePrimitives> BlockHashReader for StaticFileJarProvider<'_, N> {
218    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
219        self.cursor()?.get_one::<BlockHashMask>(number.into())
220    }
221
222    fn canonical_hashes_range(
223        &self,
224        start: BlockNumber,
225        end: BlockNumber,
226    ) -> ProviderResult<Vec<B256>> {
227        let mut cursor = self.cursor()?;
228        let mut hashes = Vec::with_capacity((end - start) as usize);
229
230        for number in start..end {
231            if let Some(hash) = cursor.get_one::<BlockHashMask>(number.into())? {
232                hashes.push(hash)
233            }
234        }
235        Ok(hashes)
236    }
237}
238
239impl<N: NodePrimitives> BlockNumReader for StaticFileJarProvider<'_, N> {
240    fn chain_info(&self) -> ProviderResult<ChainInfo> {
241        // Information on live database
242        Err(ProviderError::UnsupportedProvider)
243    }
244
245    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
246        // Information on live database
247        Err(ProviderError::UnsupportedProvider)
248    }
249
250    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
251        // Information on live database
252        Err(ProviderError::UnsupportedProvider)
253    }
254
255    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
256        let mut cursor = self.cursor()?;
257
258        Ok(cursor
259            .get_one::<BlockHashMask>((&hash).into())?
260            .and_then(|res| (res == hash).then(|| cursor.number()).flatten()))
261    }
262}
263
264impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
265    for StaticFileJarProvider<'_, N>
266{
267    type Transaction = N::SignedTx;
268
269    fn transaction_id(&self, hash: TxHash) -> ProviderResult<Option<TxNumber>> {
270        let mut cursor = self.cursor()?;
271
272        Ok(cursor
273            .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
274            .and_then(|res| (res.trie_hash() == hash).then(|| cursor.number()).flatten()))
275    }
276
277    fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
278        self.cursor()?.get_one::<TransactionMask<Self::Transaction>>(num.into())
279    }
280
281    fn transaction_by_id_unhashed(
282        &self,
283        num: TxNumber,
284    ) -> ProviderResult<Option<Self::Transaction>> {
285        self.cursor()?.get_one::<TransactionMask<Self::Transaction>>(num.into())
286    }
287
288    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
289        self.cursor()?.get_one::<TransactionMask<Self::Transaction>>((&hash).into())
290    }
291
292    fn transaction_by_hash_with_meta(
293        &self,
294        _hash: TxHash,
295    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
296        // Information required on indexing table [`tables::TransactionBlocks`]
297        Err(ProviderError::UnsupportedProvider)
298    }
299
300    fn transactions_by_block(
301        &self,
302        _block_id: BlockHashOrNumber,
303    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
304        // Related to indexing tables. Live database should get the tx_range and call static file
305        // provider with `transactions_by_tx_range` instead.
306        Err(ProviderError::UnsupportedProvider)
307    }
308
309    fn transactions_by_block_range(
310        &self,
311        _range: impl RangeBounds<BlockNumber>,
312    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
313        // Related to indexing tables. Live database should get the tx_range and call static file
314        // provider with `transactions_by_tx_range` instead.
315        Err(ProviderError::UnsupportedProvider)
316    }
317
318    fn transactions_by_tx_range(
319        &self,
320        range: impl RangeBounds<TxNumber>,
321    ) -> ProviderResult<Vec<Self::Transaction>> {
322        let mut cursor = self.cursor()?;
323        let mut txs = Vec::with_capacity(range_size_hint(&range).unwrap_or(1024));
324
325        for num in to_range(range) {
326            if let Some(tx) = cursor.get_one::<TransactionMask<Self::Transaction>>(num.into())? {
327                txs.push(tx)
328            }
329        }
330        Ok(txs)
331    }
332
333    fn senders_by_tx_range(
334        &self,
335        range: impl RangeBounds<TxNumber>,
336    ) -> ProviderResult<Vec<Address>> {
337        let mut cursor = self.cursor()?;
338        let mut senders = Vec::with_capacity(range_size_hint(&range).unwrap_or(1024));
339
340        for num in to_range(range) {
341            if let Some(tx) = cursor.get_one::<TransactionSenderMask>(num.into())? {
342                senders.push(tx)
343            }
344        }
345        Ok(senders)
346    }
347
348    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
349        self.cursor()?.get_one::<TransactionSenderMask>(id.into())
350    }
351}
352
353impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction, Receipt: Decompress>>
354    ReceiptProvider for StaticFileJarProvider<'_, N>
355{
356    type Receipt = N::Receipt;
357
358    fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
359        self.cursor()?.get_one::<ReceiptMask<Self::Receipt>>(num.into())
360    }
361
362    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
363        if let Some(tx_static_file) = &self.auxiliary_jar &&
364            let Some(num) = tx_static_file.transaction_id(hash)?
365        {
366            return self.receipt(num)
367        }
368        Ok(None)
369    }
370
371    fn receipts_by_block(
372        &self,
373        _block: BlockHashOrNumber,
374    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
375        // Related to indexing tables. StaticFile should get the tx_range and call static file
376        // provider with `receipt()` instead for each
377        Err(ProviderError::UnsupportedProvider)
378    }
379
380    fn receipts_by_tx_range(
381        &self,
382        range: impl RangeBounds<TxNumber>,
383    ) -> ProviderResult<Vec<Self::Receipt>> {
384        let mut cursor = self.cursor()?;
385        let mut receipts = Vec::with_capacity(range_size_hint(&range).unwrap_or(1024));
386
387        for num in to_range(range) {
388            if let Some(tx) = cursor.get_one::<ReceiptMask<Self::Receipt>>(num.into())? {
389                receipts.push(tx)
390            }
391        }
392        Ok(receipts)
393    }
394
395    fn receipts_by_block_range(
396        &self,
397        _block_range: RangeInclusive<BlockNumber>,
398    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
399        // Related to indexing tables. StaticFile should get the tx_range and call static file
400        // provider with `receipt()` instead for each
401        Err(ProviderError::UnsupportedProvider)
402    }
403}