1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
//! Consist of types adjacent to the fee history cache and its configs

use std::{
    collections::{BTreeMap, VecDeque},
    fmt::Debug,
    sync::{atomic::Ordering::SeqCst, Arc},
};

use futures::{
    future::{Fuse, FusedFuture},
    FutureExt, Stream, StreamExt,
};
use metrics::atomics::AtomicU64;
use reth_chainspec::ChainSpec;
use reth_primitives::{
    basefee::calc_next_block_base_fee,
    eip4844::{calc_blob_gasprice, calculate_excess_blob_gas},
    Receipt, SealedBlock, TransactionSigned, B256,
};
use reth_provider::{BlockReaderIdExt, CanonStateNotification, ChainSpecProvider};
use reth_rpc_types::TxGasAndReward;
use serde::{Deserialize, Serialize};
use tracing::trace;

use reth_rpc_server_types::constants::gas_oracle::MAX_HEADER_HISTORY;

use super::{EthApiError, EthStateCache};

/// Contains cached fee history entries for blocks.
///
/// Purpose for this is to provide cached data for `eth_feeHistory`.
#[derive(Debug, Clone)]
pub struct FeeHistoryCache {
    inner: Arc<FeeHistoryCacheInner>,
}

impl FeeHistoryCache {
    /// Creates new `FeeHistoryCache` instance, initialize it with the more recent data, set bounds
    pub fn new(eth_cache: EthStateCache, config: FeeHistoryCacheConfig) -> Self {
        let inner = FeeHistoryCacheInner {
            lower_bound: Default::default(),
            upper_bound: Default::default(),
            config,
            entries: Default::default(),
            eth_cache,
        };
        Self { inner: Arc::new(inner) }
    }

    /// How the cache is configured.
    #[inline]
    pub fn config(&self) -> &FeeHistoryCacheConfig {
        &self.inner.config
    }

    /// Returns the configured resolution for percentile approximation.
    #[inline]
    pub fn resolution(&self) -> u64 {
        self.config().resolution
    }

    /// Returns all blocks that are missing in the cache in the [`lower_bound`, `upper_bound`]
    /// range.
    ///
    /// This function is used to populate the cache with missing blocks, which can happen if the
    /// node switched to stage sync node.
    async fn missing_consecutive_blocks(&self) -> VecDeque<u64> {
        let entries = self.inner.entries.read().await;
        (self.lower_bound()..self.upper_bound())
            .rev()
            .filter(|&block_number| !entries.contains_key(&block_number))
            .collect()
    }

    /// Insert block data into the cache.
    async fn insert_blocks<I>(&self, blocks: I)
    where
        I: IntoIterator<Item = (SealedBlock, Arc<Vec<Receipt>>)>,
    {
        let mut entries = self.inner.entries.write().await;

        let percentiles = self.predefined_percentiles();
        // Insert all new blocks and calculate approximated rewards
        for (block, receipts) in blocks {
            let mut fee_history_entry = FeeHistoryEntry::new(&block);
            fee_history_entry.rewards = calculate_reward_percentiles_for_block(
                &percentiles,
                fee_history_entry.gas_used,
                fee_history_entry.base_fee_per_gas,
                &block.body,
                &receipts,
            )
            .unwrap_or_default();
            entries.insert(block.number, fee_history_entry);
        }

        // enforce bounds by popping the oldest entries
        while entries.len() > self.inner.config.max_blocks as usize {
            entries.pop_first();
        }

        if entries.len() == 0 {
            self.inner.upper_bound.store(0, SeqCst);
            self.inner.lower_bound.store(0, SeqCst);
            return
        }

        let upper_bound = *entries.last_entry().expect("Contains at least one entry").key();

        // also enforce proper lower bound in case we have gaps
        let target_lower = upper_bound.saturating_sub(self.inner.config.max_blocks);
        while entries.len() > 1 && *entries.first_key_value().unwrap().0 < target_lower {
            entries.pop_first();
        }

        let lower_bound = *entries.first_entry().expect("Contains at least one entry").key();
        self.inner.upper_bound.store(upper_bound, SeqCst);
        self.inner.lower_bound.store(lower_bound, SeqCst);
    }

    /// Get `UpperBound` value for `FeeHistoryCache`
    pub fn upper_bound(&self) -> u64 {
        self.inner.upper_bound.load(SeqCst)
    }

    /// Get `LowerBound` value for `FeeHistoryCache`
    pub fn lower_bound(&self) -> u64 {
        self.inner.lower_bound.load(SeqCst)
    }

    /// Collect fee history for given range.
    ///
    /// This function retrieves fee history entries from the cache for the specified range.
    /// If the requested range (`start_block` to `end_block`) is within the cache bounds,
    /// it returns the corresponding entries.
    /// Otherwise it returns None.
    pub async fn get_history(
        &self,
        start_block: u64,
        end_block: u64,
    ) -> Option<Vec<FeeHistoryEntry>> {
        let lower_bound = self.lower_bound();
        let upper_bound = self.upper_bound();
        if start_block >= lower_bound && end_block <= upper_bound {
            let entries = self.inner.entries.read().await;
            let result = entries
                .range(start_block..=end_block)
                .map(|(_, fee_entry)| fee_entry.clone())
                .collect::<Vec<_>>();

            if result.is_empty() {
                return None
            }

            Some(result)
        } else {
            None
        }
    }

    /// Generates predefined set of percentiles
    ///
    /// This returns 100 * resolution points
    pub fn predefined_percentiles(&self) -> Vec<f64> {
        let res = self.resolution() as f64;
        (0..=100 * self.resolution()).map(|p| p as f64 / res).collect()
    }
}

/// Settings for the [`FeeHistoryCache`].
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeHistoryCacheConfig {
    /// Max number of blocks in cache.
    ///
    /// Default is [`MAX_HEADER_HISTORY`] plus some change to also serve slightly older blocks from
    /// cache, since `fee_history` supports the entire range
    pub max_blocks: u64,
    /// Percentile approximation resolution
    ///
    /// Default is 4 which means 0.25
    pub resolution: u64,
}

impl Default for FeeHistoryCacheConfig {
    fn default() -> Self {
        Self { max_blocks: MAX_HEADER_HISTORY + 100, resolution: 4 }
    }
}

/// Container type for shared state in [`FeeHistoryCache`]
#[derive(Debug)]
struct FeeHistoryCacheInner {
    /// Stores the lower bound of the cache
    lower_bound: AtomicU64,
    /// Stores the upper bound of the cache
    upper_bound: AtomicU64,
    /// Config for `FeeHistoryCache`, consists of resolution for percentile approximation
    /// and max number of blocks
    config: FeeHistoryCacheConfig,
    /// Stores the entries of the cache
    entries: tokio::sync::RwLock<BTreeMap<u64, FeeHistoryEntry>>,
    eth_cache: EthStateCache,
}

/// Awaits for new chain events and directly inserts them into the cache so they're available
/// immediately before they need to be fetched from disk.
pub async fn fee_history_cache_new_blocks_task<St, Provider>(
    fee_history_cache: FeeHistoryCache,
    mut events: St,
    provider: Provider,
) where
    St: Stream<Item = CanonStateNotification> + Unpin + 'static,
    Provider: BlockReaderIdExt + ChainSpecProvider + 'static,
{
    // We're listening for new blocks emitted when the node is in live sync.
    // If the node transitions to stage sync, we need to fetch the missing blocks
    let mut missing_blocks = VecDeque::new();
    let mut fetch_missing_block = Fuse::terminated();

    loop {
        if fetch_missing_block.is_terminated() {
            if let Some(block_number) = missing_blocks.pop_front() {
                trace!(target: "rpc::fee", ?block_number, "Fetching missing block for fee history cache");
                if let Ok(Some(hash)) = provider.block_hash(block_number) {
                    // fetch missing block
                    fetch_missing_block = fee_history_cache
                        .inner
                        .eth_cache
                        .get_block_and_receipts(hash)
                        .boxed()
                        .fuse();
                }
            }
        }

        tokio::select! {
            res = &mut fetch_missing_block =>  {
                if let Ok(res) = res {
                    fee_history_cache.insert_blocks(res.into_iter()).await;
                }
            }
            event = events.next() =>  {
                let Some(event) = event else {
                     // the stream ended, we are done
                    break;
                };
                let (blocks, receipts): (Vec<_>, Vec<_>) = event
                    .committed()
                    .blocks_and_receipts()
                    .map(|(block, receipts)| {
                        (block.block.clone(), Arc::new(receipts.iter().flatten().cloned().collect::<Vec<_>>()))
                    })
                    .unzip();
                fee_history_cache.insert_blocks(blocks.into_iter().zip(receipts)).await;

                // keep track of missing blocks
                missing_blocks = fee_history_cache.missing_consecutive_blocks().await;
            }
        }
    }
}

/// Calculates reward percentiles for transactions in a block header.
/// Given a list of percentiles and a sealed block header, this function computes
/// the corresponding rewards for the transactions at each percentile.
///
/// The results are returned as a vector of U256 values.
pub fn calculate_reward_percentiles_for_block(
    percentiles: &[f64],
    gas_used: u64,
    base_fee_per_gas: u64,
    transactions: &[TransactionSigned],
    receipts: &[Receipt],
) -> Result<Vec<u128>, EthApiError> {
    let mut transactions = transactions
        .iter()
        .zip(receipts)
        .scan(0, |previous_gas, (tx, receipt)| {
            // Convert the cumulative gas used in the receipts
            // to the gas usage by the transaction
            //
            // While we will sum up the gas again later, it is worth
            // noting that the order of the transactions will be different,
            // so the sum will also be different for each receipt.
            let gas_used = receipt.cumulative_gas_used - *previous_gas;
            *previous_gas = receipt.cumulative_gas_used;

            Some(TxGasAndReward {
                gas_used,
                reward: tx.effective_tip_per_gas(Some(base_fee_per_gas)).unwrap_or_default(),
            })
        })
        .collect::<Vec<_>>();

    // Sort the transactions by their rewards in ascending order
    transactions.sort_by_key(|tx| tx.reward);

    // Find the transaction that corresponds to the given percentile
    //
    // We use a `tx_index` here that is shared across all percentiles, since we know
    // the percentiles are monotonically increasing.
    let mut tx_index = 0;
    let mut cumulative_gas_used = transactions.first().map(|tx| tx.gas_used).unwrap_or_default();
    let mut rewards_in_block = Vec::new();
    for percentile in percentiles {
        // Empty blocks should return in a zero row
        if transactions.is_empty() {
            rewards_in_block.push(0);
            continue
        }

        let threshold = (gas_used as f64 * percentile / 100.) as u64;
        while cumulative_gas_used < threshold && tx_index < transactions.len() - 1 {
            tx_index += 1;
            cumulative_gas_used += transactions[tx_index].gas_used;
        }
        rewards_in_block.push(transactions[tx_index].reward);
    }

    Ok(rewards_in_block)
}

/// A cached entry for a block's fee history.
#[derive(Debug, Clone)]
pub struct FeeHistoryEntry {
    /// The base fee per gas for this block.
    pub base_fee_per_gas: u64,
    /// Gas used ratio this block.
    pub gas_used_ratio: f64,
    /// The base per blob gas for EIP-4844.
    /// For pre EIP-4844 equals to zero.
    pub base_fee_per_blob_gas: Option<u128>,
    /// Blob gas used ratio for this block.
    ///
    /// Calculated as the ratio of blob gas used and the available blob data gas per block.
    /// Will be zero if no blob gas was used or pre EIP-4844.
    pub blob_gas_used_ratio: f64,
    /// The excess blob gas of the block.
    pub excess_blob_gas: Option<u64>,
    /// The total amount of blob gas consumed by the transactions within the block,
    /// added in EIP-4844
    pub blob_gas_used: Option<u64>,
    /// Gas used by this block.
    pub gas_used: u64,
    /// Gas limit by this block.
    pub gas_limit: u64,
    /// Hash of the block.
    pub header_hash: B256,
    /// Approximated rewards for the configured percentiles.
    pub rewards: Vec<u128>,
    /// The timestamp of the block.
    pub timestamp: u64,
}

impl FeeHistoryEntry {
    /// Creates a new entry from a sealed block.
    ///
    /// Note: This does not calculate the rewards for the block.
    pub fn new(block: &SealedBlock) -> Self {
        Self {
            base_fee_per_gas: block.base_fee_per_gas.unwrap_or_default(),
            gas_used_ratio: block.gas_used as f64 / block.gas_limit as f64,
            base_fee_per_blob_gas: block.blob_fee(),
            blob_gas_used_ratio: block.blob_gas_used() as f64 /
                reth_primitives::constants::eip4844::MAX_DATA_GAS_PER_BLOCK as f64,
            excess_blob_gas: block.excess_blob_gas,
            blob_gas_used: block.blob_gas_used,
            gas_used: block.gas_used,
            header_hash: block.hash(),
            gas_limit: block.gas_limit,
            rewards: Vec::new(),
            timestamp: block.timestamp,
        }
    }

    /// Returns the base fee for the next block according to the EIP-1559 spec.
    pub fn next_block_base_fee(&self, chain_spec: &ChainSpec) -> u64 {
        calc_next_block_base_fee(
            self.gas_used as u128,
            self.gas_limit as u128,
            self.base_fee_per_gas as u128,
            chain_spec.base_fee_params_at_timestamp(self.timestamp),
        ) as u64
    }

    /// Returns the blob fee for the next block according to the EIP-4844 spec.
    ///
    /// Returns `None` if `excess_blob_gas` is None.
    ///
    /// See also [`Self::next_block_excess_blob_gas`]
    pub fn next_block_blob_fee(&self) -> Option<u128> {
        self.next_block_excess_blob_gas().map(calc_blob_gasprice)
    }

    /// Calculate excess blob gas for the next block according to the EIP-4844 spec.
    ///
    /// Returns a `None` if no excess blob gas is set, no EIP-4844 support
    pub fn next_block_excess_blob_gas(&self) -> Option<u64> {
        Some(calculate_excess_blob_gas(self.excess_blob_gas?, self.blob_gas_used?))
    }
}