reth_rpc_eth_api/helpers/
pending_block.rsuse std::time::{Duration, Instant};
use crate::{EthApiTypes, FromEthApiError, FromEvmError, RpcNodeCore};
use alloy_consensus::{Header, EMPTY_OMMER_ROOT_HASH};
use alloy_eips::{
eip4844::MAX_DATA_GAS_PER_BLOCK, eip7685::EMPTY_REQUESTS_HASH, merge::BEACON_NONCE,
};
use alloy_primitives::{BlockNumber, B256, U256};
use alloy_rpc_types_eth::BlockNumberOrTag;
use futures::Future;
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_evm::{
state_change::post_block_withdrawals_balance_increments, system_calls::SystemCaller,
ConfigureEvm, ConfigureEvmEnv,
};
use reth_execution_types::ExecutionOutcome;
use reth_primitives::{
proofs::calculate_transaction_root,
revm_primitives::{
BlockEnv, CfgEnv, CfgEnvWithHandlerCfg, EVMError, Env, ExecutionResult, InvalidTransaction,
ResultAndState, SpecId,
},
Block, BlockBody, Receipt, SealedBlockWithSenders, SealedHeader, TransactionSignedEcRecovered,
};
use reth_provider::{
BlockReader, BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, ProviderError,
ReceiptProvider, StateProviderFactory,
};
use reth_revm::database::StateProviderDatabase;
use reth_rpc_eth_types::{EthApiError, PendingBlock, PendingBlockEnv, PendingBlockEnvOrigin};
use reth_transaction_pool::{BestTransactionsAttributes, TransactionPool};
use reth_trie::HashedPostState;
use revm::{db::states::bundle_state::BundleRetention, DatabaseCommit, State};
use tokio::sync::Mutex;
use tracing::debug;
use super::SpawnBlocking;
pub trait LoadPendingBlock:
EthApiTypes
+ RpcNodeCore<
Provider: BlockReaderIdExt
+ EvmEnvProvider
+ ChainSpecProvider<ChainSpec: EthChainSpec + EthereumHardforks>
+ StateProviderFactory,
Pool: TransactionPool,
Evm: ConfigureEvm<Header = Header>,
>
{
fn pending_block(&self) -> &Mutex<Option<PendingBlock>>;
fn pending_block_env_and_cfg(&self) -> Result<PendingBlockEnv, Self::Error> {
let origin: PendingBlockEnvOrigin = if let Some(pending) =
self.provider().pending_block_with_senders().map_err(Self::Error::from_eth_err)?
{
PendingBlockEnvOrigin::ActualPending(pending)
} else {
let latest = self
.provider()
.latest_header()
.map_err(Self::Error::from_eth_err)?
.ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
let (mut latest_header, block_hash) = latest.split();
latest_header.number += 1;
latest_header.timestamp += 12;
let chain_spec = self.provider().chain_spec();
latest_header.base_fee_per_gas = latest_header.next_block_base_fee(
chain_spec.base_fee_params_at_timestamp(latest_header.timestamp),
);
latest_header.excess_blob_gas = latest_header.next_block_excess_blob_gas();
let latest = SealedHeader::new(latest_header, block_hash);
PendingBlockEnvOrigin::DerivedFromLatest(latest)
};
let mut cfg = CfgEnvWithHandlerCfg::new_with_spec_id(CfgEnv::default(), SpecId::LATEST);
let mut block_env = BlockEnv::default();
self.provider()
.fill_env_with_header(
&mut cfg,
&mut block_env,
origin.header(),
self.evm_config().clone(),
)
.map_err(Self::Error::from_eth_err)?;
Ok(PendingBlockEnv::new(cfg, block_env, origin))
}
fn local_pending_block(
&self,
) -> impl Future<Output = Result<Option<(SealedBlockWithSenders, Vec<Receipt>)>, Self::Error>> + Send
where
Self: SpawnBlocking,
{
async move {
let pending = self.pending_block_env_and_cfg()?;
if pending.origin.is_actual_pending() {
if let Some(block) = pending.origin.clone().into_actual_pending() {
if let Some(receipts) = self
.provider()
.receipts_by_block(block.hash().into())
.map_err(Self::Error::from_eth_err)?
{
return Ok(Some((block, receipts)))
}
}
}
let mut lock = self.pending_block().lock().await;
let now = Instant::now();
if let Some(pending_block) = lock.as_ref() {
if pending.block_env.number.to::<u64>() == pending_block.block.number &&
pending.origin.header().hash() == pending_block.block.parent_hash &&
now <= pending_block.expires_at
{
return Ok(Some((pending_block.block.clone(), pending_block.receipts.clone())));
}
}
let (sealed_block, receipts) = match self
.spawn_blocking_io(move |this| {
this.build_block(pending)
})
.await
{
Ok(block) => block,
Err(err) => {
debug!(target: "rpc", "Failed to build pending block: {:?}", err);
return Ok(None)
}
};
let now = Instant::now();
*lock = Some(PendingBlock::new(
now + Duration::from_secs(1),
sealed_block.clone(),
receipts.clone(),
));
Ok(Some((sealed_block, receipts)))
}
}
fn assemble_receipt(
&self,
tx: &TransactionSignedEcRecovered,
result: ExecutionResult,
cumulative_gas_used: u64,
) -> Receipt {
#[allow(clippy::needless_update)]
Receipt {
tx_type: tx.tx_type(),
success: result.is_success(),
cumulative_gas_used,
logs: result.into_logs().into_iter().map(Into::into).collect(),
..Default::default()
}
}
fn receipts_root(
&self,
_block_env: &BlockEnv,
execution_outcome: &ExecutionOutcome,
block_number: BlockNumber,
) -> B256 {
execution_outcome.receipts_root_slow(block_number).expect("Block is present")
}
fn build_block(
&self,
env: PendingBlockEnv,
) -> Result<(SealedBlockWithSenders, Vec<Receipt>), Self::Error>
where
EthApiError: From<ProviderError>,
{
let PendingBlockEnv { cfg, block_env, origin } = env;
let parent_hash = origin.build_target_hash();
let state_provider = self
.provider()
.history_by_block_hash(parent_hash)
.map_err(Self::Error::from_eth_err)?;
let state = StateProviderDatabase::new(state_provider);
let mut db = State::builder().with_database(state).with_bundle_update().build();
let mut cumulative_gas_used = 0;
let mut sum_blob_gas_used = 0;
let block_gas_limit: u64 = block_env.gas_limit.to::<u64>();
let base_fee = block_env.basefee.to::<u64>();
let block_number = block_env.number.to::<u64>();
let mut executed_txs = Vec::new();
let mut senders = Vec::new();
let mut best_txs =
self.pool().best_transactions_with_attributes(BestTransactionsAttributes::new(
base_fee,
block_env.get_blob_gasprice().map(|gasprice| gasprice as u64),
));
let (withdrawals, withdrawals_root) = match origin {
PendingBlockEnvOrigin::ActualPending(ref block) => {
(block.body.withdrawals.clone(), block.withdrawals_root)
}
PendingBlockEnvOrigin::DerivedFromLatest(_) => (None, None),
};
let chain_spec = self.provider().chain_spec();
let mut system_caller = SystemCaller::new(self.evm_config().clone(), chain_spec.clone());
let parent_beacon_block_root = if origin.is_actual_pending() {
system_caller
.pre_block_beacon_root_contract_call(
&mut db,
&cfg,
&block_env,
origin.header().parent_beacon_block_root,
)
.map_err(|err| EthApiError::Internal(err.into()))?;
origin.header().parent_beacon_block_root
} else {
None
};
system_caller
.pre_block_blockhashes_contract_call(&mut db, &cfg, &block_env, origin.header().hash())
.map_err(|err| EthApiError::Internal(err.into()))?;
let mut receipts = Vec::new();
while let Some(pool_tx) = best_txs.next() {
if cumulative_gas_used + pool_tx.gas_limit() > block_gas_limit {
best_txs.mark_invalid(&pool_tx);
continue
}
if pool_tx.origin.is_private() {
best_txs.mark_invalid(&pool_tx);
continue
}
let tx = pool_tx.to_recovered_transaction();
if let Some(blob_tx) = tx.transaction.as_eip4844() {
let tx_blob_gas = blob_tx.blob_gas();
if sum_blob_gas_used + tx_blob_gas > MAX_DATA_GAS_PER_BLOCK {
best_txs.mark_invalid(&pool_tx);
continue
}
}
let env = Env::boxed(
cfg.cfg_env.clone(),
block_env.clone(),
Self::evm_config(self).tx_env(tx.as_signed(), tx.signer()),
);
let mut evm = revm::Evm::builder().with_env(env).with_db(&mut db).build();
let ResultAndState { result, state } = match evm.transact() {
Ok(res) => res,
Err(err) => {
match err {
EVMError::Transaction(err) => {
if matches!(err, InvalidTransaction::NonceTooLow { .. }) {
} else {
best_txs.mark_invalid(&pool_tx);
}
continue
}
err => {
return Err(Self::Error::from_evm_err(err))
}
}
}
};
drop(evm);
db.commit(state);
if let Some(blob_tx) = tx.transaction.as_eip4844() {
let tx_blob_gas = blob_tx.blob_gas();
sum_blob_gas_used += tx_blob_gas;
if sum_blob_gas_used == MAX_DATA_GAS_PER_BLOCK {
best_txs.skip_blobs();
}
}
let gas_used = result.gas_used();
cumulative_gas_used += gas_used;
receipts.push(Some(self.assemble_receipt(&tx, result, cumulative_gas_used)));
let (tx, sender) = tx.to_components();
executed_txs.push(tx);
senders.push(sender);
}
let balance_increments = post_block_withdrawals_balance_increments(
chain_spec.as_ref(),
block_env.timestamp.try_into().unwrap_or(u64::MAX),
&withdrawals.clone().unwrap_or_default(),
);
db.increment_balances(balance_increments).map_err(Self::Error::from_eth_err)?;
db.merge_transitions(BundleRetention::PlainState);
let execution_outcome = ExecutionOutcome::new(
db.take_bundle(),
vec![receipts.clone()].into(),
block_number,
Vec::new(),
);
let hashed_state = HashedPostState::from_bundle_state(&execution_outcome.state().state);
let receipts_root = self.receipts_root(&block_env, &execution_outcome, block_number);
let logs_bloom =
execution_outcome.block_logs_bloom(block_number).expect("Block is present");
let state_root = db.database.state_root(hashed_state).map_err(Self::Error::from_eth_err)?;
let transactions_root = calculate_transaction_root(&executed_txs);
let blob_gas_used =
(cfg.handler_cfg.spec_id >= SpecId::CANCUN).then_some(sum_blob_gas_used);
let requests_hash = chain_spec
.is_prague_active_at_timestamp(block_env.timestamp.to::<u64>())
.then_some(EMPTY_REQUESTS_HASH);
let header = Header {
parent_hash,
ommers_hash: EMPTY_OMMER_ROOT_HASH,
beneficiary: block_env.coinbase,
state_root,
transactions_root,
receipts_root,
withdrawals_root,
logs_bloom,
timestamp: block_env.timestamp.to::<u64>(),
mix_hash: block_env.prevrandao.unwrap_or_default(),
nonce: BEACON_NONCE.into(),
base_fee_per_gas: Some(base_fee),
number: block_number,
gas_limit: block_gas_limit,
difficulty: U256::ZERO,
gas_used: cumulative_gas_used,
blob_gas_used: blob_gas_used.map(Into::into),
excess_blob_gas: block_env.get_blob_excess_gas().map(Into::into),
extra_data: Default::default(),
parent_beacon_block_root,
requests_hash,
};
let receipts: Vec<Receipt> = receipts.into_iter().flatten().collect();
let block = Block {
header,
body: BlockBody { transactions: executed_txs, ommers: vec![], withdrawals },
};
Ok((SealedBlockWithSenders { block: block.seal_slow(), senders }, receipts))
}
}