1use std::{
4 collections::{BTreeMap, VecDeque},
5 fmt::Debug,
6 sync::{atomic::Ordering::SeqCst, Arc},
7};
8
9use alloy_consensus::{BlockHeader, Header, Transaction, TxReceipt};
10use alloy_eips::eip7840::BlobParams;
11use alloy_rpc_types_eth::TxGasAndReward;
12use futures::{
13 future::{Fuse, FusedFuture},
14 FutureExt, Stream, StreamExt,
15};
16use metrics::atomics::AtomicU64;
17use reth_chain_state::CanonStateNotification;
18use reth_chainspec::{ChainSpecProvider, EthChainSpec};
19use reth_primitives_traits::{Block, BlockBody, NodePrimitives, SealedBlock};
20use reth_rpc_server_types::constants::gas_oracle::MAX_HEADER_HISTORY;
21use reth_storage_api::BlockReaderIdExt;
22use serde::{Deserialize, Serialize};
23use tracing::trace;
24
25use crate::utils::checked_blob_gas_used_ratio;
26
27use super::{EthApiError, EthStateCache};
28
29#[derive(Debug, Clone)]
33pub struct FeeHistoryCache<H> {
34 inner: Arc<FeeHistoryCacheInner<H>>,
35}
36
37impl<H> FeeHistoryCache<H>
38where
39 H: BlockHeader + Clone,
40{
41 pub fn new(config: FeeHistoryCacheConfig) -> Self {
43 let inner = FeeHistoryCacheInner {
44 lower_bound: Default::default(),
45 upper_bound: Default::default(),
46 config,
47 entries: Default::default(),
48 };
49 Self { inner: Arc::new(inner) }
50 }
51
52 #[inline]
54 pub fn config(&self) -> &FeeHistoryCacheConfig {
55 &self.inner.config
56 }
57
58 #[inline]
60 pub fn resolution(&self) -> u64 {
61 self.config().resolution
62 }
63
64 async fn missing_consecutive_blocks(&self) -> VecDeque<u64> {
70 let entries = self.inner.entries.read().await;
71 (self.lower_bound()..self.upper_bound())
72 .rev()
73 .filter(|&block_number| !entries.contains_key(&block_number))
74 .collect()
75 }
76
77 async fn insert_blocks<'a, I, B, R, C>(&self, blocks: I, chain_spec: &C)
79 where
80 B: Block<Header = H> + 'a,
81 R: TxReceipt + 'a,
82 I: IntoIterator<Item = (&'a SealedBlock<B>, &'a [R])>,
83 C: EthChainSpec,
84 {
85 let mut entries = self.inner.entries.write().await;
86
87 let percentiles = self.predefined_percentiles();
88 for (block, receipts) in blocks {
90 let mut fee_history_entry = FeeHistoryEntry::<H>::new(
91 block,
92 chain_spec.blob_params_at_timestamp(block.header().timestamp()),
93 );
94 fee_history_entry.rewards = calculate_reward_percentiles_for_block(
95 &percentiles,
96 fee_history_entry.header.gas_used(),
97 fee_history_entry.header.base_fee_per_gas().unwrap_or_default(),
98 block.body().transactions(),
99 receipts,
100 )
101 .unwrap_or_default();
102 entries.insert(block.number(), fee_history_entry);
103 }
104
105 while entries.len() > self.inner.config.max_blocks as usize {
107 entries.pop_first();
108 }
109
110 if entries.is_empty() {
111 self.inner.upper_bound.store(0, SeqCst);
112 self.inner.lower_bound.store(0, SeqCst);
113 return
114 }
115
116 let upper_bound = *entries.last_entry().expect("Contains at least one entry").key();
117
118 let target_lower = upper_bound.saturating_sub(self.inner.config.max_blocks);
120 while entries.len() > 1 && *entries.first_key_value().unwrap().0 < target_lower {
121 entries.pop_first();
122 }
123
124 let lower_bound = *entries.first_entry().expect("Contains at least one entry").key();
125 self.inner.upper_bound.store(upper_bound, SeqCst);
126 self.inner.lower_bound.store(lower_bound, SeqCst);
127 }
128
129 pub fn upper_bound(&self) -> u64 {
131 self.inner.upper_bound.load(SeqCst)
132 }
133
134 pub fn lower_bound(&self) -> u64 {
136 self.inner.lower_bound.load(SeqCst)
137 }
138
139 pub async fn get_history(
146 &self,
147 start_block: u64,
148 end_block: u64,
149 ) -> Option<Vec<FeeHistoryEntry<H>>> {
150 if end_block < start_block {
151 return None
153 }
154 let lower_bound = self.lower_bound();
155 let upper_bound = self.upper_bound();
156 if start_block >= lower_bound && end_block <= upper_bound {
157 let entries = self.inner.entries.read().await;
158 let result = entries
159 .range(start_block..=end_block)
160 .map(|(_, fee_entry)| fee_entry.clone())
161 .collect::<Vec<_>>();
162
163 if result.is_empty() {
164 return None
165 }
166
167 Some(result)
168 } else {
169 None
170 }
171 }
172
173 pub fn predefined_percentiles(&self) -> Vec<f64> {
177 let res = self.resolution() as f64;
178 (0..=100 * self.resolution()).map(|p| p as f64 / res).collect()
179 }
180}
181
182#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
184#[serde(rename_all = "camelCase")]
185pub struct FeeHistoryCacheConfig {
186 pub max_blocks: u64,
191 pub resolution: u64,
195}
196
197impl Default for FeeHistoryCacheConfig {
198 fn default() -> Self {
199 Self { max_blocks: MAX_HEADER_HISTORY + 100, resolution: 4 }
200 }
201}
202
203#[derive(Debug)]
205struct FeeHistoryCacheInner<H> {
206 lower_bound: AtomicU64,
208 upper_bound: AtomicU64,
210 config: FeeHistoryCacheConfig,
213 entries: tokio::sync::RwLock<BTreeMap<u64, FeeHistoryEntry<H>>>,
215}
216
217pub async fn fee_history_cache_new_blocks_task<St, Provider, N>(
220 fee_history_cache: FeeHistoryCache<N::BlockHeader>,
221 mut events: St,
222 provider: Provider,
223 cache: EthStateCache<N>,
224) where
225 St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
226 Provider:
227 BlockReaderIdExt<Block = N::Block, Receipt = N::Receipt> + ChainSpecProvider + 'static,
228 N: NodePrimitives,
229 N::BlockHeader: BlockHeader + Clone,
230{
231 let mut missing_blocks = VecDeque::new();
234 let mut fetch_missing_block = Fuse::terminated();
235
236 loop {
237 if fetch_missing_block.is_terminated() &&
238 let Some(block_number) = missing_blocks.pop_front()
239 {
240 trace!(target: "rpc::fee", ?block_number, "Fetching missing block for fee history cache");
241 if let Ok(Some(hash)) = provider.block_hash(block_number) {
242 fetch_missing_block = cache.get_block_and_receipts(hash).boxed().fuse();
244 }
245 }
246
247 let chain_spec = provider.chain_spec();
248
249 tokio::select! {
250 res = &mut fetch_missing_block => {
251 if let Ok(res) = res {
252 let res = res.as_ref()
253 .map(|(b, r)| (b.sealed_block(), r.as_slice()));
254 fee_history_cache.insert_blocks(res, &chain_spec).await;
255 }
256 }
257 event = events.next() => {
258 let Some(event) = event else {
259 break
261 };
262
263 let committed = event.committed();
264 let blocks_and_receipts = committed
265 .blocks_and_receipts()
266 .map(|(block, receipts)| {
267 (block.sealed_block(), receipts.as_slice())
268 });
269 fee_history_cache.insert_blocks(blocks_and_receipts, &chain_spec).await;
270
271 missing_blocks = fee_history_cache.missing_consecutive_blocks().await;
273 }
274 }
275 }
276}
277
278pub fn calculate_reward_percentiles_for_block<T, R>(
284 percentiles: &[f64],
285 gas_used: u64,
286 base_fee_per_gas: u64,
287 transactions: &[T],
288 receipts: &[R],
289) -> Result<Vec<u128>, EthApiError>
290where
291 T: Transaction,
292 R: TxReceipt,
293{
294 let mut transactions = transactions
295 .iter()
296 .zip(receipts)
297 .scan(0, |previous_gas, (tx, receipt)| {
298 let gas_used = receipt.cumulative_gas_used() - *previous_gas;
305 *previous_gas = receipt.cumulative_gas_used();
306
307 Some(TxGasAndReward {
308 gas_used,
309 reward: tx.effective_tip_per_gas(base_fee_per_gas).unwrap_or_default(),
310 })
311 })
312 .collect::<Vec<_>>();
313
314 transactions.sort_by_key(|tx| tx.reward);
316
317 let mut tx_index = 0;
322 let mut cumulative_gas_used = transactions.first().map(|tx| tx.gas_used).unwrap_or_default();
323 let mut rewards_in_block = Vec::with_capacity(percentiles.len());
324 for percentile in percentiles {
325 if transactions.is_empty() {
327 rewards_in_block.push(0);
328 continue
329 }
330
331 let threshold = (gas_used as f64 * percentile / 100.) as u64;
332 while cumulative_gas_used < threshold && tx_index < transactions.len() - 1 {
333 tx_index += 1;
334 cumulative_gas_used += transactions[tx_index].gas_used;
335 }
336 rewards_in_block.push(transactions[tx_index].reward);
337 }
338
339 Ok(rewards_in_block)
340}
341
342#[derive(Debug, Clone)]
344pub struct FeeHistoryEntry<H = Header> {
345 pub header: H,
347 pub gas_used_ratio: f64,
349 pub base_fee_per_blob_gas: Option<u128>,
352 pub blob_gas_used_ratio: f64,
357 pub rewards: Vec<u128>,
359 pub blob_params: Option<BlobParams>,
361}
362
363impl<H> FeeHistoryEntry<H>
364where
365 H: BlockHeader + Clone,
366{
367 pub fn new<B>(block: &SealedBlock<B>, blob_params: Option<BlobParams>) -> Self
371 where
372 B: Block<Header = H>,
373 {
374 let header = block.header();
375 Self {
376 header: block.header().clone(),
377 gas_used_ratio: header.gas_used() as f64 / header.gas_limit() as f64,
378 base_fee_per_blob_gas: header
379 .excess_blob_gas()
380 .and_then(|excess_blob_gas| Some(blob_params?.calc_blob_fee(excess_blob_gas))),
381 blob_gas_used_ratio: checked_blob_gas_used_ratio(
382 block.body().blob_gas_used(),
383 blob_params
384 .as_ref()
385 .map(|params| params.max_blob_gas_per_block())
386 .unwrap_or(alloy_eips::eip4844::MAX_DATA_GAS_PER_BLOCK_DENCUN),
387 ),
388 rewards: Vec::new(),
389 blob_params,
390 }
391 }
392
393 pub fn next_block_blob_fee(&self) -> Option<u128> {
399 self.next_block_excess_blob_gas()
400 .and_then(|excess_blob_gas| Some(self.blob_params?.calc_blob_fee(excess_blob_gas)))
401 }
402
403 pub fn next_block_excess_blob_gas(&self) -> Option<u64> {
407 self.header.excess_blob_gas().and_then(|excess_blob_gas| {
408 Some(self.blob_params?.next_block_excess_blob_gas_osaka(
409 excess_blob_gas,
410 self.header.blob_gas_used()?,
411 self.header.base_fee_per_gas()?,
412 ))
413 })
414 }
415}