reth_rpc_eth_api/helpers/
pending_block.rsuse super::SpawnBlocking;
use crate::{EthApiTypes, FromEthApiError, FromEvmError, RpcNodeCore};
use alloy_consensus::{BlockHeader, Transaction};
use alloy_eips::eip4844::MAX_DATA_GAS_PER_BLOCK;
use alloy_network::Network;
use alloy_primitives::B256;
use alloy_rpc_types_eth::BlockNumberOrTag;
use futures::Future;
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_errors::RethError;
use reth_evm::{
env::EvmEnv, state_change::post_block_withdrawals_balance_increments,
system_calls::SystemCaller, ConfigureEvm, ConfigureEvmEnv, NextBlockEnvAttributes,
};
use reth_primitives::{BlockExt, InvalidTransactionError, SealedBlockWithSenders};
use reth_primitives_traits::Receipt;
use reth_provider::{
BlockReader, BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, ProviderBlock, ProviderError,
ProviderHeader, ProviderReceipt, ProviderTx, ReceiptProvider, StateProviderFactory,
};
use reth_revm::{
database::StateProviderDatabase,
primitives::{
BlockEnv, CfgEnvWithHandlerCfg, EVMError, Env, ExecutionResult, InvalidTransaction,
ResultAndState,
},
};
use reth_rpc_eth_types::{EthApiError, PendingBlock, PendingBlockEnv, PendingBlockEnvOrigin};
use reth_transaction_pool::{
error::InvalidPoolTransactionError, BestTransactionsAttributes, PoolTransaction,
TransactionPool,
};
use revm::{db::states::bundle_state::BundleRetention, DatabaseCommit, State};
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tracing::debug;
pub trait LoadPendingBlock:
EthApiTypes<
NetworkTypes: Network<
HeaderResponse = alloy_rpc_types_eth::Header<ProviderHeader<Self::Provider>>,
>,
> + RpcNodeCore<
Provider: BlockReaderIdExt<Receipt: Receipt>
+ EvmEnvProvider<ProviderHeader<Self::Provider>>
+ ChainSpecProvider<ChainSpec: EthChainSpec + EthereumHardforks>
+ StateProviderFactory,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = ProviderTx<Self::Provider>>>,
Evm: ConfigureEvm<
Header = ProviderHeader<Self::Provider>,
Transaction = ProviderTx<Self::Provider>,
>,
>
{
#[expect(clippy::type_complexity)]
fn pending_block(
&self,
) -> &Mutex<Option<PendingBlock<ProviderBlock<Self::Provider>, ProviderReceipt<Self::Provider>>>>;
#[expect(clippy::type_complexity)]
fn pending_block_env_and_cfg(
&self,
) -> Result<
PendingBlockEnv<ProviderBlock<Self::Provider>, ProviderReceipt<Self::Provider>>,
Self::Error,
> {
if let Some(block) =
self.provider().pending_block_with_senders().map_err(Self::Error::from_eth_err)?
{
if let Some(receipts) = self
.provider()
.receipts_by_block(block.hash().into())
.map_err(Self::Error::from_eth_err)?
{
let evm_env = self
.provider()
.env_with_header(block.header(), self.evm_config().clone())
.map_err(Self::Error::from_eth_err)?;
let EvmEnv { cfg_env_with_handler_cfg, block_env } = evm_env;
return Ok(PendingBlockEnv::new(
cfg_env_with_handler_cfg,
block_env,
PendingBlockEnvOrigin::ActualPending(block, receipts),
));
}
}
let latest = self
.provider()
.latest_header()
.map_err(Self::Error::from_eth_err)?
.ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
let EvmEnv { cfg_env_with_handler_cfg, block_env } = self
.evm_config()
.next_cfg_and_block_env(
&latest,
NextBlockEnvAttributes {
timestamp: latest.timestamp() + 12,
suggested_fee_recipient: latest.beneficiary(),
prev_randao: B256::random(),
gas_limit: latest.gas_limit(),
},
)
.map_err(RethError::other)
.map_err(Self::Error::from_eth_err)?;
Ok(PendingBlockEnv::new(
cfg_env_with_handler_cfg,
block_env,
PendingBlockEnvOrigin::DerivedFromLatest(latest.hash()),
))
}
#[expect(clippy::type_complexity)]
fn local_pending_block(
&self,
) -> impl Future<
Output = Result<
Option<(
SealedBlockWithSenders<<Self::Provider as BlockReader>::Block>,
Vec<ProviderReceipt<Self::Provider>>,
)>,
Self::Error,
>,
> + Send
where
Self: SpawnBlocking,
{
async move {
let pending = self.pending_block_env_and_cfg()?;
let parent_hash = match pending.origin {
PendingBlockEnvOrigin::ActualPending(block, receipts) => {
return Ok(Some((block, receipts)));
}
PendingBlockEnvOrigin::DerivedFromLatest(parent_hash) => parent_hash,
};
let mut lock = self.pending_block().lock().await;
let now = Instant::now();
if let Some(pending_block) = lock.as_ref() {
if pending.block_env.number.to::<u64>() == pending_block.block.number() &&
parent_hash == pending_block.block.parent_hash() &&
now <= pending_block.expires_at
{
return Ok(Some((pending_block.block.clone(), pending_block.receipts.clone())));
}
}
let (sealed_block, receipts) = match self
.spawn_blocking_io(move |this| {
this.build_block(pending.cfg, pending.block_env, parent_hash)
})
.await
{
Ok(block) => block,
Err(err) => {
debug!(target: "rpc", "Failed to build pending block: {:?}", err);
return Ok(None)
}
};
let now = Instant::now();
*lock = Some(PendingBlock::new(
now + Duration::from_secs(1),
sealed_block.clone(),
receipts.clone(),
));
Ok(Some((sealed_block, receipts)))
}
}
fn assemble_receipt(
&self,
tx: &ProviderTx<Self::Provider>,
result: ExecutionResult,
cumulative_gas_used: u64,
) -> ProviderReceipt<Self::Provider>;
fn assemble_block(
&self,
block_env: &BlockEnv,
parent_hash: revm_primitives::B256,
state_root: revm_primitives::B256,
transactions: Vec<ProviderTx<Self::Provider>>,
receipts: &[ProviderReceipt<Self::Provider>],
) -> ProviderBlock<Self::Provider>;
fn assemble_block_and_receipts(
&self,
block_env: &BlockEnv,
parent_hash: revm_primitives::B256,
state_root: revm_primitives::B256,
transactions: Vec<ProviderTx<Self::Provider>>,
results: Vec<ExecutionResult>,
) -> (ProviderBlock<Self::Provider>, Vec<ProviderReceipt<Self::Provider>>) {
let mut cumulative_gas_used = 0;
let mut receipts = Vec::with_capacity(results.len());
for (tx, outcome) in transactions.iter().zip(results) {
cumulative_gas_used += outcome.gas_used();
receipts.push(self.assemble_receipt(tx, outcome, cumulative_gas_used));
}
let block =
self.assemble_block(block_env, parent_hash, state_root, transactions, &receipts);
(block, receipts)
}
#[expect(clippy::type_complexity)]
fn build_block(
&self,
cfg: CfgEnvWithHandlerCfg,
block_env: BlockEnv,
parent_hash: B256,
) -> Result<
(
SealedBlockWithSenders<ProviderBlock<Self::Provider>>,
Vec<ProviderReceipt<Self::Provider>>,
),
Self::Error,
>
where
EthApiError: From<ProviderError>,
{
let state_provider = self
.provider()
.history_by_block_hash(parent_hash)
.map_err(Self::Error::from_eth_err)?;
let state = StateProviderDatabase::new(state_provider);
let mut db = State::builder().with_database(state).with_bundle_update().build();
let mut cumulative_gas_used = 0;
let mut sum_blob_gas_used = 0;
let block_gas_limit: u64 = block_env.gas_limit.to::<u64>();
let base_fee = block_env.basefee.to::<u64>();
let mut executed_txs = Vec::new();
let mut senders = Vec::new();
let mut best_txs =
self.pool().best_transactions_with_attributes(BestTransactionsAttributes::new(
base_fee,
block_env.get_blob_gasprice().map(|gasprice| gasprice as u64),
));
let chain_spec = self.provider().chain_spec();
let mut system_caller = SystemCaller::new(self.evm_config().clone(), chain_spec.clone());
system_caller
.pre_block_blockhashes_contract_call(&mut db, &cfg, &block_env, parent_hash)
.map_err(|err| EthApiError::Internal(err.into()))?;
let mut results = Vec::new();
while let Some(pool_tx) = best_txs.next() {
if cumulative_gas_used + pool_tx.gas_limit() > block_gas_limit {
best_txs.mark_invalid(
&pool_tx,
InvalidPoolTransactionError::ExceedsGasLimit(
pool_tx.gas_limit(),
block_gas_limit,
),
);
continue
}
if pool_tx.origin.is_private() {
best_txs.mark_invalid(
&pool_tx,
InvalidPoolTransactionError::Consensus(
InvalidTransactionError::TxTypeNotSupported,
),
);
continue
}
let tx = pool_tx.to_consensus();
if let Some(tx_blob_gas) = tx.blob_gas_used() {
if sum_blob_gas_used + tx_blob_gas > MAX_DATA_GAS_PER_BLOCK {
best_txs.mark_invalid(
&pool_tx,
InvalidPoolTransactionError::ExceedsGasLimit(
tx_blob_gas,
MAX_DATA_GAS_PER_BLOCK,
),
);
continue
}
}
let env = Env::boxed(
cfg.cfg_env.clone(),
block_env.clone(),
Self::evm_config(self).tx_env(tx.as_signed(), tx.signer()),
);
let mut evm = revm::Evm::builder().with_env(env).with_db(&mut db).build();
let ResultAndState { result, state } = match evm.transact() {
Ok(res) => res,
Err(err) => {
match err {
EVMError::Transaction(err) => {
if matches!(err, InvalidTransaction::NonceTooLow { .. }) {
} else {
best_txs.mark_invalid(
&pool_tx,
InvalidPoolTransactionError::Consensus(
InvalidTransactionError::TxTypeNotSupported,
),
);
}
continue
}
err => {
return Err(Self::Error::from_evm_err(err))
}
}
}
};
drop(evm);
db.commit(state);
if let Some(tx_blob_gas) = tx.blob_gas_used() {
sum_blob_gas_used += tx_blob_gas;
if sum_blob_gas_used == MAX_DATA_GAS_PER_BLOCK {
best_txs.skip_blobs();
}
}
let gas_used = result.gas_used();
cumulative_gas_used += gas_used;
let (tx, sender) = tx.to_components();
executed_txs.push(tx);
senders.push(sender);
results.push(result);
}
let balance_increments = post_block_withdrawals_balance_increments(
chain_spec.as_ref(),
block_env.timestamp.try_into().unwrap_or(u64::MAX),
&[],
);
db.increment_balances(balance_increments).map_err(Self::Error::from_eth_err)?;
db.merge_transitions(BundleRetention::PlainState);
let bundle_state = db.take_bundle();
let hashed_state = db.database.hashed_post_state(&bundle_state);
let state_root = db.database.state_root(hashed_state).map_err(Self::Error::from_eth_err)?;
let (block, receipts) = self.assemble_block_and_receipts(
&block_env,
parent_hash,
state_root,
executed_txs,
results,
);
Ok((SealedBlockWithSenders { block: block.seal_slow(), senders }, receipts))
}
}