reth_rpc_eth_types/
fee_history.rs

1//! Consist of types adjacent to the fee history cache and its configs
2
3use std::{
4    collections::{BTreeMap, VecDeque},
5    fmt::Debug,
6    sync::{atomic::Ordering::SeqCst, Arc},
7};
8
9use alloy_consensus::{BlockHeader, Header, Transaction, TxReceipt};
10use alloy_eips::eip7840::BlobParams;
11use alloy_rpc_types_eth::TxGasAndReward;
12use futures::{
13    future::{Fuse, FusedFuture},
14    FutureExt, Stream, StreamExt,
15};
16use metrics::atomics::AtomicU64;
17use reth_chain_state::CanonStateNotification;
18use reth_chainspec::{ChainSpecProvider, EthChainSpec};
19use reth_primitives_traits::{Block, BlockBody, NodePrimitives, SealedBlock};
20use reth_rpc_server_types::constants::gas_oracle::MAX_HEADER_HISTORY;
21use reth_storage_api::BlockReaderIdExt;
22use serde::{Deserialize, Serialize};
23use tracing::trace;
24
25use crate::utils::checked_blob_gas_used_ratio;
26
27use super::{EthApiError, EthStateCache};
28
29/// Contains cached fee history entries for blocks.
30///
31/// Purpose for this is to provide cached data for `eth_feeHistory`.
32#[derive(Debug, Clone)]
33pub struct FeeHistoryCache<H> {
34    inner: Arc<FeeHistoryCacheInner<H>>,
35}
36
37impl<H> FeeHistoryCache<H>
38where
39    H: BlockHeader + Clone,
40{
41    /// Creates new `FeeHistoryCache` instance, initialize it with the more recent data, set bounds
42    pub fn new(config: FeeHistoryCacheConfig) -> Self {
43        let inner = FeeHistoryCacheInner {
44            lower_bound: Default::default(),
45            upper_bound: Default::default(),
46            config,
47            entries: Default::default(),
48        };
49        Self { inner: Arc::new(inner) }
50    }
51
52    /// How the cache is configured.
53    #[inline]
54    pub fn config(&self) -> &FeeHistoryCacheConfig {
55        &self.inner.config
56    }
57
58    /// Returns the configured resolution for percentile approximation.
59    #[inline]
60    pub fn resolution(&self) -> u64 {
61        self.config().resolution
62    }
63
64    /// Returns all blocks that are missing in the cache in the [`lower_bound`, `upper_bound`]
65    /// range.
66    ///
67    /// This function is used to populate the cache with missing blocks, which can happen if the
68    /// node switched to stage sync node.
69    async fn missing_consecutive_blocks(&self) -> VecDeque<u64> {
70        let entries = self.inner.entries.read().await;
71        (self.lower_bound()..self.upper_bound())
72            .rev()
73            .filter(|&block_number| !entries.contains_key(&block_number))
74            .collect()
75    }
76
77    /// Insert block data into the cache.
78    async fn insert_blocks<'a, I, B, R, C>(&self, blocks: I, chain_spec: &C)
79    where
80        B: Block<Header = H> + 'a,
81        R: TxReceipt + 'a,
82        I: IntoIterator<Item = (&'a SealedBlock<B>, &'a [R])>,
83        C: EthChainSpec,
84    {
85        let mut entries = self.inner.entries.write().await;
86
87        let percentiles = self.predefined_percentiles();
88        // Insert all new blocks and calculate approximated rewards
89        for (block, receipts) in blocks {
90            let mut fee_history_entry = FeeHistoryEntry::<H>::new(
91                block,
92                chain_spec.blob_params_at_timestamp(block.header().timestamp()),
93            );
94            fee_history_entry.rewards = calculate_reward_percentiles_for_block(
95                &percentiles,
96                fee_history_entry.header.gas_used(),
97                fee_history_entry.header.base_fee_per_gas().unwrap_or_default(),
98                block.body().transactions(),
99                receipts,
100            )
101            .unwrap_or_default();
102            entries.insert(block.number(), fee_history_entry);
103        }
104
105        // enforce bounds by popping the oldest entries
106        while entries.len() > self.inner.config.max_blocks as usize {
107            entries.pop_first();
108        }
109
110        if entries.is_empty() {
111            self.inner.upper_bound.store(0, SeqCst);
112            self.inner.lower_bound.store(0, SeqCst);
113            return
114        }
115
116        let upper_bound = *entries.last_entry().expect("Contains at least one entry").key();
117
118        // also enforce proper lower bound in case we have gaps
119        let target_lower = upper_bound.saturating_sub(self.inner.config.max_blocks);
120        while entries.len() > 1 && *entries.first_key_value().unwrap().0 < target_lower {
121            entries.pop_first();
122        }
123
124        let lower_bound = *entries.first_entry().expect("Contains at least one entry").key();
125        self.inner.upper_bound.store(upper_bound, SeqCst);
126        self.inner.lower_bound.store(lower_bound, SeqCst);
127    }
128
129    /// Get `UpperBound` value for `FeeHistoryCache`
130    pub fn upper_bound(&self) -> u64 {
131        self.inner.upper_bound.load(SeqCst)
132    }
133
134    /// Get `LowerBound` value for `FeeHistoryCache`
135    pub fn lower_bound(&self) -> u64 {
136        self.inner.lower_bound.load(SeqCst)
137    }
138
139    /// Collect fee history for the given range (inclusive `start_block..=end_block`).
140    ///
141    /// This function retrieves fee history entries from the cache for the specified range.
142    /// If the requested range (`start_block` to `end_block`) is within the cache bounds,
143    /// it returns the corresponding entries.
144    /// Otherwise it returns None.
145    pub async fn get_history(
146        &self,
147        start_block: u64,
148        end_block: u64,
149    ) -> Option<Vec<FeeHistoryEntry<H>>> {
150        if end_block < start_block {
151            // invalid range, return None
152            return None
153        }
154        let lower_bound = self.lower_bound();
155        let upper_bound = self.upper_bound();
156        if start_block >= lower_bound && end_block <= upper_bound {
157            let entries = self.inner.entries.read().await;
158            let result = entries
159                .range(start_block..=end_block)
160                .map(|(_, fee_entry)| fee_entry.clone())
161                .collect::<Vec<_>>();
162
163            if result.is_empty() {
164                return None
165            }
166
167            Some(result)
168        } else {
169            None
170        }
171    }
172
173    /// Generates predefined set of percentiles
174    ///
175    /// This returns 100 * resolution points
176    pub fn predefined_percentiles(&self) -> Vec<f64> {
177        let res = self.resolution() as f64;
178        (0..=100 * self.resolution()).map(|p| p as f64 / res).collect()
179    }
180}
181
182/// Settings for the [`FeeHistoryCache`].
183#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
184#[serde(rename_all = "camelCase")]
185pub struct FeeHistoryCacheConfig {
186    /// Max number of blocks in cache.
187    ///
188    /// Default is [`MAX_HEADER_HISTORY`] plus some change to also serve slightly older blocks from
189    /// cache, since `fee_history` supports the entire range
190    pub max_blocks: u64,
191    /// Percentile approximation resolution
192    ///
193    /// Default is 4 which means 0.25
194    pub resolution: u64,
195}
196
197impl Default for FeeHistoryCacheConfig {
198    fn default() -> Self {
199        Self { max_blocks: MAX_HEADER_HISTORY + 100, resolution: 4 }
200    }
201}
202
203/// Container type for shared state in [`FeeHistoryCache`]
204#[derive(Debug)]
205struct FeeHistoryCacheInner<H> {
206    /// Stores the lower bound of the cache
207    lower_bound: AtomicU64,
208    /// Stores the upper bound of the cache
209    upper_bound: AtomicU64,
210    /// Config for `FeeHistoryCache`, consists of resolution for percentile approximation
211    /// and max number of blocks
212    config: FeeHistoryCacheConfig,
213    /// Stores the entries of the cache
214    entries: tokio::sync::RwLock<BTreeMap<u64, FeeHistoryEntry<H>>>,
215}
216
217/// Awaits for new chain events and directly inserts them into the cache so they're available
218/// immediately before they need to be fetched from disk.
219pub async fn fee_history_cache_new_blocks_task<St, Provider, N>(
220    fee_history_cache: FeeHistoryCache<N::BlockHeader>,
221    mut events: St,
222    provider: Provider,
223    cache: EthStateCache<N>,
224) where
225    St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
226    Provider:
227        BlockReaderIdExt<Block = N::Block, Receipt = N::Receipt> + ChainSpecProvider + 'static,
228    N: NodePrimitives,
229    N::BlockHeader: BlockHeader + Clone,
230{
231    // We're listening for new blocks emitted when the node is in live sync.
232    // If the node transitions to stage sync, we need to fetch the missing blocks
233    let mut missing_blocks = VecDeque::new();
234    let mut fetch_missing_block = Fuse::terminated();
235
236    loop {
237        if fetch_missing_block.is_terminated() &&
238            let Some(block_number) = missing_blocks.pop_front()
239        {
240            trace!(target: "rpc::fee", ?block_number, "Fetching missing block for fee history cache");
241            if let Ok(Some(hash)) = provider.block_hash(block_number) {
242                // fetch missing block
243                fetch_missing_block = cache.get_block_and_receipts(hash).boxed().fuse();
244            }
245        }
246
247        let chain_spec = provider.chain_spec();
248
249        tokio::select! {
250            res = &mut fetch_missing_block =>  {
251                if let Ok(res) = res {
252                    let res = res.as_ref()
253                        .map(|(b, r)| (b.sealed_block(), r.as_slice()));
254                    fee_history_cache.insert_blocks(res, &chain_spec).await;
255                }
256            }
257            event = events.next() =>  {
258                let Some(event) = event else {
259                     // the stream ended, we are done
260                    break
261                };
262
263                let committed = event.committed();
264                let blocks_and_receipts = committed
265                    .blocks_and_receipts()
266                    .map(|(block, receipts)| {
267                        (block.sealed_block(), receipts.as_slice())
268                    });
269                fee_history_cache.insert_blocks(blocks_and_receipts, &chain_spec).await;
270
271                // keep track of missing blocks
272                missing_blocks = fee_history_cache.missing_consecutive_blocks().await;
273            }
274        }
275    }
276}
277
278/// Calculates reward percentiles for transactions in a block header.
279/// Given a list of percentiles and a sealed block header, this function computes
280/// the corresponding rewards for the transactions at each percentile.
281///
282/// The results are returned as a vector of U256 values.
283pub fn calculate_reward_percentiles_for_block<T, R>(
284    percentiles: &[f64],
285    gas_used: u64,
286    base_fee_per_gas: u64,
287    transactions: &[T],
288    receipts: &[R],
289) -> Result<Vec<u128>, EthApiError>
290where
291    T: Transaction,
292    R: TxReceipt,
293{
294    let mut transactions = transactions
295        .iter()
296        .zip(receipts)
297        .scan(0, |previous_gas, (tx, receipt)| {
298            // Convert the cumulative gas used in the receipts
299            // to the gas usage by the transaction
300            //
301            // While we will sum up the gas again later, it is worth
302            // noting that the order of the transactions will be different,
303            // so the sum will also be different for each receipt.
304            let gas_used = receipt.cumulative_gas_used() - *previous_gas;
305            *previous_gas = receipt.cumulative_gas_used();
306
307            Some(TxGasAndReward {
308                gas_used,
309                reward: tx.effective_tip_per_gas(base_fee_per_gas).unwrap_or_default(),
310            })
311        })
312        .collect::<Vec<_>>();
313
314    // Sort the transactions by their rewards in ascending order
315    transactions.sort_by_key(|tx| tx.reward);
316
317    // Find the transaction that corresponds to the given percentile
318    //
319    // We use a `tx_index` here that is shared across all percentiles, since we know
320    // the percentiles are monotonically increasing.
321    let mut tx_index = 0;
322    let mut cumulative_gas_used = transactions.first().map(|tx| tx.gas_used).unwrap_or_default();
323    let mut rewards_in_block = Vec::with_capacity(percentiles.len());
324    for percentile in percentiles {
325        // Empty blocks should return in a zero row
326        if transactions.is_empty() {
327            rewards_in_block.push(0);
328            continue
329        }
330
331        let threshold = (gas_used as f64 * percentile / 100.) as u64;
332        while cumulative_gas_used < threshold && tx_index < transactions.len() - 1 {
333            tx_index += 1;
334            cumulative_gas_used += transactions[tx_index].gas_used;
335        }
336        rewards_in_block.push(transactions[tx_index].reward);
337    }
338
339    Ok(rewards_in_block)
340}
341
342/// A cached entry for a block's fee history.
343#[derive(Debug, Clone)]
344pub struct FeeHistoryEntry<H = Header> {
345    /// The full block header.
346    pub header: H,
347    /// Gas used ratio this block.
348    pub gas_used_ratio: f64,
349    /// The base per blob gas for EIP-4844.
350    /// For pre EIP-4844 equals to zero.
351    pub base_fee_per_blob_gas: Option<u128>,
352    /// Blob gas used ratio for this block.
353    ///
354    /// Calculated as the ratio of blob gas used and the available blob data gas per block.
355    /// Will be zero if no blob gas was used or pre EIP-4844.
356    pub blob_gas_used_ratio: f64,
357    /// Approximated rewards for the configured percentiles.
358    pub rewards: Vec<u128>,
359    /// Blob parameters for this block.
360    pub blob_params: Option<BlobParams>,
361}
362
363impl<H> FeeHistoryEntry<H>
364where
365    H: BlockHeader + Clone,
366{
367    /// Creates a new entry from a sealed block.
368    ///
369    /// Note: This does not calculate the rewards for the block.
370    pub fn new<B>(block: &SealedBlock<B>, blob_params: Option<BlobParams>) -> Self
371    where
372        B: Block<Header = H>,
373    {
374        let header = block.header();
375        Self {
376            header: block.header().clone(),
377            gas_used_ratio: header.gas_used() as f64 / header.gas_limit() as f64,
378            base_fee_per_blob_gas: header
379                .excess_blob_gas()
380                .and_then(|excess_blob_gas| Some(blob_params?.calc_blob_fee(excess_blob_gas))),
381            blob_gas_used_ratio: checked_blob_gas_used_ratio(
382                block.body().blob_gas_used(),
383                blob_params
384                    .as_ref()
385                    .map(|params| params.max_blob_gas_per_block())
386                    .unwrap_or(alloy_eips::eip4844::MAX_DATA_GAS_PER_BLOCK_DENCUN),
387            ),
388            rewards: Vec::new(),
389            blob_params,
390        }
391    }
392
393    /// Returns the blob fee for the next block according to the EIP-4844 spec.
394    ///
395    /// Returns `None` if `excess_blob_gas` is None.
396    ///
397    /// See also [`Self::next_block_excess_blob_gas`]
398    pub fn next_block_blob_fee(&self) -> Option<u128> {
399        self.next_block_excess_blob_gas()
400            .and_then(|excess_blob_gas| Some(self.blob_params?.calc_blob_fee(excess_blob_gas)))
401    }
402
403    /// Calculate excess blob gas for the next block according to the EIP-4844 spec.
404    ///
405    /// Returns a `None` if no excess blob gas is set, no EIP-4844 support
406    pub fn next_block_excess_blob_gas(&self) -> Option<u64> {
407        self.header.excess_blob_gas().and_then(|excess_blob_gas| {
408            Some(self.blob_params?.next_block_excess_blob_gas_osaka(
409                excess_blob_gas,
410                self.header.blob_gas_used()?,
411                self.header.base_fee_per_gas()?,
412            ))
413        })
414    }
415}