reth_rpc_eth_api/helpers/
pending_block.rs1use super::SpawnBlocking;
5use crate::{EthApiTypes, FromEthApiError, FromEvmError, RpcNodeCore};
6use alloy_consensus::{BlockHeader, Transaction};
7use alloy_eips::eip7840::BlobParams;
8use alloy_primitives::{B256, U256};
9use alloy_rpc_types_eth::BlockNumberOrTag;
10use futures::Future;
11use reth_chain_state::{BlockState, ComputedTrieData, ExecutedBlock};
12use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
13use reth_errors::{BlockExecutionError, BlockValidationError, ProviderError, RethError};
14use reth_evm::{
15 block::TxResult,
16 execute::{BlockBuilder, BlockBuilderOutcome, BlockExecutionOutput},
17 ConfigureEvm, Evm, EvmEnvFor, NextBlockEnvAttributes,
18};
19use reth_primitives_traits::{transaction::error::InvalidTransactionError, HeaderTy, SealedHeader};
20use reth_revm::{database::StateProviderDatabase, db::State};
21use reth_rpc_convert::RpcConvert;
22use reth_rpc_eth_types::{
23 block::BlockAndReceipts, builder::config::PendingBlockKind, EthApiError, PendingBlock,
24 PendingBlockEnv, PendingBlockEnvOrigin,
25};
26use reth_storage_api::{
27 noop::NoopProvider, BlockReader, BlockReaderIdExt, ProviderHeader, ProviderTx,
28 StateProviderBox, StateProviderFactory,
29};
30use reth_transaction_pool::{
31 error::InvalidPoolTransactionError, BestTransactions, BestTransactionsAttributes,
32 PoolTransaction, TransactionPool,
33};
34use revm::context_interface::{Block, Cfg as _};
35use std::{
36 sync::Arc,
37 time::{Duration, Instant},
38};
39use tokio::sync::Mutex;
40use tracing::debug;
41
42pub trait LoadPendingBlock:
46 EthApiTypes<
47 Error: FromEvmError<Self::Evm>,
48 RpcConvert: RpcConvert<Network = Self::NetworkTypes>,
49 > + RpcNodeCore
50{
51 fn pending_block(&self) -> &Mutex<Option<PendingBlock<Self::Primitives>>>;
55
56 fn pending_env_builder(&self) -> &dyn PendingEnvBuilder<Self::Evm>;
58
59 fn pending_block_kind(&self) -> PendingBlockKind;
61
62 fn pending_block_env_and_cfg(&self) -> Result<PendingBlockEnv<Self::Evm>, Self::Error> {
66 if let Some((block, receipts)) =
67 self.provider().pending_block_and_receipts().map_err(Self::Error::from_eth_err)?
68 {
69 let evm_env = self
73 .evm_config()
74 .evm_env(block.header())
75 .map_err(RethError::other)
76 .map_err(Self::Error::from_eth_err)?;
77
78 return Ok(PendingBlockEnv::new(
79 evm_env,
80 PendingBlockEnvOrigin::ActualPending(Arc::new(block), Arc::new(receipts)),
81 ));
82 }
83
84 let latest = self
87 .provider()
88 .latest_header()
89 .map_err(Self::Error::from_eth_err)?
90 .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
91
92 let evm_env = self
93 .evm_config()
94 .next_evm_env(&latest, &self.next_env_attributes(&latest)?)
95 .map_err(RethError::other)
96 .map_err(Self::Error::from_eth_err)?;
97
98 Ok(PendingBlockEnv::new(evm_env, PendingBlockEnvOrigin::DerivedFromLatest(latest)))
99 }
100
101 fn next_env_attributes(
103 &self,
104 parent: &SealedHeader<ProviderHeader<Self::Provider>>,
105 ) -> Result<<Self::Evm as ConfigureEvm>::NextBlockEnvCtx, Self::Error> {
106 Ok(self.pending_env_builder().pending_env_attributes(parent)?)
107 }
108
109 fn local_pending_state(
111 &self,
112 ) -> impl Future<Output = Result<Option<StateProviderBox>, Self::Error>> + Send
113 where
114 Self: SpawnBlocking,
115 {
116 async move {
117 let Some(pending_block) = self.pool_pending_block().await? else {
118 return Ok(None);
119 };
120
121 let latest_historical = self
122 .provider()
123 .history_by_block_hash(pending_block.block().parent_hash())
124 .map_err(Self::Error::from_eth_err)?;
125
126 let state = BlockState::from(pending_block);
127
128 Ok(Some(Box::new(state.state_provider(latest_historical)) as StateProviderBox))
129 }
130 }
131
132 fn pool_pending_block(
134 &self,
135 ) -> impl Future<Output = Result<Option<PendingBlock<Self::Primitives>>, Self::Error>> + Send
136 where
137 Self: SpawnBlocking,
138 {
139 async move {
140 if self.pending_block_kind().is_none() {
141 return Ok(None);
142 }
143 let pending = self.pending_block_env_and_cfg()?;
144 let parent = match pending.origin {
145 PendingBlockEnvOrigin::ActualPending(..) => return Ok(None),
146 PendingBlockEnvOrigin::DerivedFromLatest(parent) => parent,
147 };
148
149 self.build_pool_pending_block(parent, pending.evm_env).await
150 }
151 }
152
153 fn build_pool_pending_block(
158 &self,
159 parent: SealedHeader<ProviderHeader<Self::Provider>>,
160 evm_env: EvmEnvFor<Self::Evm>,
161 ) -> impl Future<Output = Result<Option<PendingBlock<Self::Primitives>>, Self::Error>> + Send
162 where
163 Self: SpawnBlocking,
164 {
165 async move {
166 let mut lock = self.pending_block().lock().await;
168
169 let now = Instant::now();
170
171 if let Some(pending_block) = lock.as_ref() {
173 if evm_env.block_env.number() == U256::from(pending_block.block().number()) &&
175 parent.hash() == pending_block.block().parent_hash() &&
176 now <= pending_block.expires_at
177 {
178 return Ok(Some(pending_block.clone()));
179 }
180 }
181
182 let executed_block = match self
183 .spawn_blocking_io(move |this| {
184 this.build_block(&parent)
186 })
187 .await
188 {
189 Ok(block) => block,
190 Err(err) => {
191 debug!(target: "rpc", "Failed to build pending block: {:?}", err);
192 return Ok(None)
193 }
194 };
195
196 let pending = PendingBlock::with_executed_block(
197 Instant::now() + Duration::from_secs(1),
198 executed_block,
199 );
200
201 *lock = Some(pending.clone());
202
203 Ok(Some(pending))
204 }
205 }
206
207 fn local_pending_block(
209 &self,
210 ) -> impl Future<Output = Result<Option<BlockAndReceipts<Self::Primitives>>, Self::Error>> + Send
211 where
212 Self: SpawnBlocking,
213 Self::Pool:
214 TransactionPool<Transaction: PoolTransaction<Consensus = ProviderTx<Self::Provider>>>,
215 {
216 async move {
217 if self.pending_block_kind().is_none() {
218 return Ok(None);
219 }
220
221 let pending = self.pending_block_env_and_cfg()?;
222
223 Ok(match pending.origin {
224 PendingBlockEnvOrigin::ActualPending(block, receipts) => {
225 Some(BlockAndReceipts { block, receipts })
226 }
227 PendingBlockEnvOrigin::DerivedFromLatest(parent) => self
228 .build_pool_pending_block(parent, pending.evm_env)
229 .await?
230 .map(PendingBlock::into_block_and_receipts),
231 })
232 }
233 }
234
235 fn build_block(
243 &self,
244 parent: &SealedHeader<ProviderHeader<Self::Provider>>,
245 ) -> Result<ExecutedBlock<Self::Primitives>, Self::Error>
246 where
247 Self::Pool:
248 TransactionPool<Transaction: PoolTransaction<Consensus = ProviderTx<Self::Provider>>>,
249 EthApiError: From<ProviderError>,
250 {
251 let state_provider = self
252 .provider()
253 .history_by_block_hash(parent.hash())
254 .map_err(Self::Error::from_eth_err)?;
255 let state = StateProviderDatabase::new(state_provider);
256 let mut db = State::builder().with_database(state).with_bundle_update().build();
257
258 let mut builder = self
259 .evm_config()
260 .builder_for_next_block(&mut db, parent, self.next_env_attributes(parent)?)
261 .map_err(RethError::other)
262 .map_err(Self::Error::from_eth_err)?;
263
264 builder.apply_pre_execution_changes().map_err(Self::Error::from_eth_err)?;
265
266 let block_gas_limit: u64 = builder.evm().block().gas_limit();
267 let is_amsterdam = self
268 .provider()
269 .chain_spec()
270 .is_amsterdam_active_at_timestamp(builder.evm().block().timestamp().saturating_to());
271 let basefee = builder.evm().block().basefee();
272 let blob_gasprice = builder.evm().block().blob_gasprice().map(|p| p as u64);
273
274 let blob_params = self
275 .provider()
276 .chain_spec()
277 .blob_params_at_timestamp(parent.timestamp())
278 .unwrap_or_else(BlobParams::cancun);
279 let mut cumulative_tx_gas_used = 0;
280 let mut block_regular_gas_used = 0;
281 let mut block_state_gas_used = 0;
282 let mut sum_blob_gas_used = 0;
283 let tx_gas_limit_cap = builder.evm().cfg_env().tx_gas_limit_cap();
284
285 if !self.pending_block_kind().is_empty() {
287 let mut best_txs = self
288 .pool()
289 .best_transactions_with_attributes(BestTransactionsAttributes::new(
290 basefee,
291 blob_gasprice,
292 ))
293 .without_updates();
295
296 while let Some(pool_tx) = best_txs.next() {
297 let exceeds_gas_limit = if is_amsterdam {
299 let regular_available_gas =
300 block_gas_limit.saturating_sub(block_regular_gas_used);
301 let state_available_gas = block_gas_limit.saturating_sub(block_state_gas_used);
302 let regular_tx_gas_limit = pool_tx.gas_limit().min(tx_gas_limit_cap);
303
304 if regular_tx_gas_limit > regular_available_gas {
305 Some((regular_tx_gas_limit, regular_available_gas))
306 } else if pool_tx.gas_limit() > state_available_gas {
307 Some((pool_tx.gas_limit(), state_available_gas))
308 } else {
309 None
310 }
311 } else {
312 let block_available_gas =
313 block_gas_limit.saturating_sub(cumulative_tx_gas_used);
314 (pool_tx.gas_limit() > block_available_gas)
315 .then_some((pool_tx.gas_limit(), block_available_gas))
316 };
317
318 if let Some((transaction_gas_limit, block_available_gas)) = exceeds_gas_limit {
319 best_txs.mark_invalid(
323 &pool_tx,
324 InvalidPoolTransactionError::ExceedsGasLimit(
325 transaction_gas_limit,
326 block_available_gas,
327 ),
328 );
329 continue
330 }
331
332 if pool_tx.origin.is_private() {
333 best_txs.mark_invalid(
337 &pool_tx,
338 InvalidPoolTransactionError::Consensus(
339 InvalidTransactionError::TxTypeNotSupported,
340 ),
341 );
342 continue
343 }
344
345 let tx = pool_tx.to_consensus();
347
348 let tx_blob_gas = tx.blob_gas_used();
351 if let Some(tx_blob_gas) = tx_blob_gas &&
352 sum_blob_gas_used + tx_blob_gas > blob_params.max_blob_gas_per_block()
353 {
354 best_txs.mark_invalid(
359 &pool_tx,
360 InvalidPoolTransactionError::ExceedsGasLimit(
361 tx_blob_gas,
362 blob_params.max_blob_gas_per_block(),
363 ),
364 );
365 continue
366 }
367
368 let mut tx_regular_gas_used = 0;
369 let gas_output =
370 match builder.execute_transaction_with_result_closure(tx, |result| {
371 tx_regular_gas_used = result.result().result.gas().block_regular_gas_used();
372 }) {
373 Ok(gas_output) => gas_output,
374 Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
375 error,
376 ..
377 })) => {
378 if error.is_nonce_too_low() {
379 } else {
381 best_txs.mark_invalid(
384 &pool_tx,
385 InvalidPoolTransactionError::Consensus(
386 InvalidTransactionError::TxTypeNotSupported,
387 ),
388 );
389 }
390 continue
391 }
392 Err(BlockExecutionError::Validation(
393 BlockValidationError::TransactionGasLimitMoreThanAvailableBlockGas {
394 transaction_gas_limit,
395 block_available_gas,
396 },
397 )) => {
398 best_txs.mark_invalid(
399 &pool_tx,
400 InvalidPoolTransactionError::ExceedsGasLimit(
401 transaction_gas_limit,
402 block_available_gas,
403 ),
404 );
405 continue
406 }
407 Err(err) => return Err(Self::Error::from_eth_err(err)),
409 };
410
411 if let Some(tx_blob_gas) = tx_blob_gas {
413 sum_blob_gas_used += tx_blob_gas;
414
415 if sum_blob_gas_used == blob_params.max_blob_gas_per_block() {
417 best_txs.skip_blobs();
418 }
419 }
420
421 let gas_used = gas_output.tx_gas_used();
423 cumulative_tx_gas_used += gas_used;
424 block_regular_gas_used += tx_regular_gas_used;
425 block_state_gas_used += gas_output.state_gas_used();
426 }
427 }
428
429 let BlockBuilderOutcome { execution_result, block, hashed_state, trie_updates, .. } =
430 builder.finish(NoopProvider::default(), None).map_err(Self::Error::from_eth_err)?;
431
432 let execution_outcome =
433 BlockExecutionOutput { state: db.take_bundle(), result: execution_result };
434
435 Ok(ExecutedBlock::new(
436 block.into(),
437 Arc::new(execution_outcome),
438 ComputedTrieData::without_trie_input(
439 Arc::new(hashed_state.into_sorted()),
440 Arc::new(trie_updates.into_sorted()),
441 ),
442 ))
443 }
444}
445
446pub trait PendingEnvBuilder<Evm: ConfigureEvm>: Send + Sync + Unpin + 'static {
448 fn pending_env_attributes(
450 &self,
451 parent: &SealedHeader<HeaderTy<Evm::Primitives>>,
452 ) -> Result<Evm::NextBlockEnvCtx, EthApiError>;
453}
454
455pub trait BuildPendingEnv<Header> {
461 fn build_pending_env(parent: &SealedHeader<Header>) -> Self;
463}
464
465impl<Evm> PendingEnvBuilder<Evm> for ()
466where
467 Evm: ConfigureEvm<NextBlockEnvCtx: BuildPendingEnv<HeaderTy<Evm::Primitives>>>,
468{
469 fn pending_env_attributes(
470 &self,
471 parent: &SealedHeader<HeaderTy<Evm::Primitives>>,
472 ) -> Result<Evm::NextBlockEnvCtx, EthApiError> {
473 Ok(Evm::NextBlockEnvCtx::build_pending_env(parent))
474 }
475}
476
477impl<H: BlockHeader> BuildPendingEnv<H> for NextBlockEnvAttributes {
478 fn build_pending_env(parent: &SealedHeader<H>) -> Self {
479 Self {
480 timestamp: parent.timestamp().saturating_add(12),
481 suggested_fee_recipient: parent.beneficiary(),
482 prev_randao: B256::random(),
483 gas_limit: parent.gas_limit(),
484 parent_beacon_block_root: parent.parent_beacon_block_root(),
485 withdrawals: parent.withdrawals_root().map(|_| Default::default()),
486 extra_data: parent.extra_data().clone(),
487 slot_number: parent.slot_number().map(|slot| slot.saturating_add(1)),
488 }
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use alloy_consensus::Header;
496 use alloy_primitives::B256;
497 use reth_primitives_traits::SealedHeader;
498
499 #[test]
500 fn pending_env_keeps_parent_beacon_root() {
501 let mut header = Header::default();
502 let beacon_root = B256::repeat_byte(0x42);
503 header.parent_beacon_block_root = Some(beacon_root);
504 let sealed = SealedHeader::new(header, B256::ZERO);
505
506 let attrs = NextBlockEnvAttributes::build_pending_env(&sealed);
507
508 assert_eq!(attrs.parent_beacon_block_root, Some(beacon_root));
509 }
510
511 #[test]
512 fn pending_env_increments_parent_slot_number() {
513 let header = Header { slot_number: Some(7), ..Default::default() };
514 let sealed = SealedHeader::new(header, B256::ZERO);
515
516 let attrs = NextBlockEnvAttributes::build_pending_env(&sealed);
517
518 assert_eq!(attrs.slot_number, Some(8));
519 }
520}