reth_rpc_eth_api/helpers/
pending_block.rs1use super::SpawnBlocking;
5use crate::{EthApiTypes, FromEthApiError, FromEvmError, RpcNodeCore};
6use alloy_consensus::{BlockHeader, Transaction};
7use alloy_eips::eip7840::BlobParams;
8use alloy_primitives::{B256, U256};
9use alloy_rpc_types_eth::BlockNumberOrTag;
10use futures::Future;
11use reth_chain_state::{BlockState, ExecutedBlock};
12use reth_chainspec::{ChainSpecProvider, EthChainSpec};
13use reth_errors::{BlockExecutionError, BlockValidationError, ProviderError, RethError};
14use reth_evm::{
15 execute::{BlockBuilder, BlockBuilderOutcome, ExecutionOutcome},
16 ConfigureEvm, Evm, NextBlockEnvAttributes, SpecFor,
17};
18use reth_primitives_traits::{transaction::error::InvalidTransactionError, HeaderTy, SealedHeader};
19use reth_revm::{database::StateProviderDatabase, db::State};
20use reth_rpc_convert::RpcConvert;
21use reth_rpc_eth_types::{
22 builder::config::PendingBlockKind, pending_block::PendingBlockAndReceipts, EthApiError,
23 PendingBlock, PendingBlockEnv, PendingBlockEnvOrigin,
24};
25use reth_storage_api::{
26 BlockReader, BlockReaderIdExt, ProviderBlock, ProviderHeader, ProviderReceipt, ProviderTx,
27 ReceiptProvider, StateProviderBox, StateProviderFactory,
28};
29use reth_transaction_pool::{
30 error::InvalidPoolTransactionError, BestTransactionsAttributes, PoolTransaction,
31 TransactionPool,
32};
33use revm::context_interface::Block;
34use std::{
35 sync::Arc,
36 time::{Duration, Instant},
37};
38use tokio::sync::Mutex;
39use tracing::debug;
40
41pub trait LoadPendingBlock:
45 EthApiTypes<
46 Error: FromEvmError<Self::Evm>,
47 RpcConvert: RpcConvert<Network = Self::NetworkTypes>,
48 > + RpcNodeCore
49{
50 fn pending_block(&self) -> &Mutex<Option<PendingBlock<Self::Primitives>>>;
54
55 fn pending_env_builder(&self) -> &dyn PendingEnvBuilder<Self::Evm>;
57
58 fn pending_block_kind(&self) -> PendingBlockKind;
60
61 #[expect(clippy::type_complexity)]
65 fn pending_block_env_and_cfg(
66 &self,
67 ) -> Result<
68 PendingBlockEnv<
69 ProviderBlock<Self::Provider>,
70 ProviderReceipt<Self::Provider>,
71 SpecFor<Self::Evm>,
72 >,
73 Self::Error,
74 > {
75 if let Some(block) = self.provider().pending_block().map_err(Self::Error::from_eth_err)? {
76 if let Some(receipts) = self
77 .provider()
78 .receipts_by_block(block.hash().into())
79 .map_err(Self::Error::from_eth_err)?
80 {
81 let evm_env = self.evm_config().evm_env(block.header());
85
86 return Ok(PendingBlockEnv::new(
87 evm_env,
88 PendingBlockEnvOrigin::ActualPending(Arc::new(block), Arc::new(receipts)),
89 ));
90 }
91 }
92
93 let latest = self
96 .provider()
97 .latest_header()
98 .map_err(Self::Error::from_eth_err)?
99 .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
100
101 let evm_env = self
102 .evm_config()
103 .next_evm_env(&latest, &self.next_env_attributes(&latest)?)
104 .map_err(RethError::other)
105 .map_err(Self::Error::from_eth_err)?;
106
107 Ok(PendingBlockEnv::new(evm_env, PendingBlockEnvOrigin::DerivedFromLatest(latest)))
108 }
109
110 fn next_env_attributes(
112 &self,
113 parent: &SealedHeader<ProviderHeader<Self::Provider>>,
114 ) -> Result<<Self::Evm as ConfigureEvm>::NextBlockEnvCtx, Self::Error> {
115 Ok(self.pending_env_builder().pending_env_attributes(parent)?)
116 }
117
118 fn local_pending_state(
120 &self,
121 ) -> impl Future<Output = Result<Option<StateProviderBox>, Self::Error>> + Send
122 where
123 Self: SpawnBlocking,
124 {
125 async move {
126 let Some(pending_block) = self.pool_pending_block().await? else {
127 return Ok(None);
128 };
129
130 let latest_historical = self
131 .provider()
132 .history_by_block_hash(pending_block.block().parent_hash())
133 .map_err(Self::Error::from_eth_err)?;
134
135 let state = BlockState::from(pending_block);
136
137 Ok(Some(Box::new(state.state_provider(latest_historical)) as StateProviderBox))
138 }
139 }
140
141 fn pool_pending_block(
143 &self,
144 ) -> impl Future<Output = Result<Option<PendingBlock<Self::Primitives>>, Self::Error>> + Send
145 where
146 Self: SpawnBlocking,
147 {
148 async move {
149 if self.pending_block_kind().is_none() {
150 return Ok(None);
151 }
152 let pending = self.pending_block_env_and_cfg()?;
153 let parent = match pending.origin {
154 PendingBlockEnvOrigin::ActualPending(..) => return Ok(None),
155 PendingBlockEnvOrigin::DerivedFromLatest(parent) => parent,
156 };
157
158 let mut lock = self.pending_block().lock().await;
160
161 let now = Instant::now();
162
163 if let Some(pending_block) = lock.as_ref() {
165 if pending.evm_env.block_env.number == U256::from(pending_block.block().number()) &&
167 parent.hash() == pending_block.block().parent_hash() &&
168 now <= pending_block.expires_at
169 {
170 return Ok(Some(pending_block.clone()));
171 }
172 }
173
174 let executed_block = match self
175 .spawn_blocking_io(move |this| {
176 this.build_block(&parent)
178 })
179 .await
180 {
181 Ok(block) => block,
182 Err(err) => {
183 debug!(target: "rpc", "Failed to build pending block: {:?}", err);
184 return Ok(None)
185 }
186 };
187
188 let pending = PendingBlock::with_executed_block(
189 Instant::now() + Duration::from_secs(1),
190 executed_block,
191 );
192
193 *lock = Some(pending.clone());
194
195 Ok(Some(pending))
196 }
197 }
198
199 fn local_pending_block(
201 &self,
202 ) -> impl Future<Output = Result<Option<PendingBlockAndReceipts<Self::Primitives>>, Self::Error>>
203 + Send
204 where
205 Self: SpawnBlocking,
206 Self::Pool:
207 TransactionPool<Transaction: PoolTransaction<Consensus = ProviderTx<Self::Provider>>>,
208 {
209 async move {
210 if self.pending_block_kind().is_none() {
211 return Ok(None);
212 }
213
214 let pending = self.pending_block_env_and_cfg()?;
215
216 Ok(match pending.origin {
217 PendingBlockEnvOrigin::ActualPending(block, receipts) => Some((block, receipts)),
218 PendingBlockEnvOrigin::DerivedFromLatest(..) => {
219 self.pool_pending_block().await?.map(PendingBlock::into_block_and_receipts)
220 }
221 })
222 }
223 }
224
225 fn build_block(
232 &self,
233 parent: &SealedHeader<ProviderHeader<Self::Provider>>,
234 ) -> Result<ExecutedBlock<Self::Primitives>, Self::Error>
235 where
236 Self::Pool:
237 TransactionPool<Transaction: PoolTransaction<Consensus = ProviderTx<Self::Provider>>>,
238 EthApiError: From<ProviderError>,
239 {
240 let state_provider = self
241 .provider()
242 .history_by_block_hash(parent.hash())
243 .map_err(Self::Error::from_eth_err)?;
244 let state = StateProviderDatabase::new(&state_provider);
245 let mut db = State::builder().with_database(state).with_bundle_update().build();
246
247 let mut builder = self
248 .evm_config()
249 .builder_for_next_block(&mut db, parent, self.next_env_attributes(parent)?)
250 .map_err(RethError::other)
251 .map_err(Self::Error::from_eth_err)?;
252
253 builder.apply_pre_execution_changes().map_err(Self::Error::from_eth_err)?;
254
255 let block_env = builder.evm_mut().block().clone();
256
257 let blob_params = self
258 .provider()
259 .chain_spec()
260 .blob_params_at_timestamp(parent.timestamp())
261 .unwrap_or_else(BlobParams::cancun);
262 let mut cumulative_gas_used = 0;
263 let mut sum_blob_gas_used = 0;
264 let block_gas_limit: u64 = block_env.gas_limit;
265
266 if !self.pending_block_kind().is_empty() {
268 let mut best_txs =
269 self.pool().best_transactions_with_attributes(BestTransactionsAttributes::new(
270 block_env.basefee,
271 block_env.blob_gasprice().map(|gasprice| gasprice as u64),
272 ));
273
274 while let Some(pool_tx) = best_txs.next() {
275 if cumulative_gas_used + pool_tx.gas_limit() > block_gas_limit {
277 best_txs.mark_invalid(
281 &pool_tx,
282 InvalidPoolTransactionError::ExceedsGasLimit(
283 pool_tx.gas_limit(),
284 block_gas_limit,
285 ),
286 );
287 continue
288 }
289
290 if pool_tx.origin.is_private() {
291 best_txs.mark_invalid(
295 &pool_tx,
296 InvalidPoolTransactionError::Consensus(
297 InvalidTransactionError::TxTypeNotSupported,
298 ),
299 );
300 continue
301 }
302
303 let tx = pool_tx.to_consensus();
305
306 if let Some(tx_blob_gas) = tx.blob_gas_used() {
309 if sum_blob_gas_used + tx_blob_gas > blob_params.max_blob_gas_per_block() {
310 best_txs.mark_invalid(
315 &pool_tx,
316 InvalidPoolTransactionError::ExceedsGasLimit(
317 tx_blob_gas,
318 blob_params.max_blob_gas_per_block(),
319 ),
320 );
321 continue
322 }
323 }
324
325 let gas_used = match builder.execute_transaction(tx.clone()) {
326 Ok(gas_used) => gas_used,
327 Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
328 error,
329 ..
330 })) => {
331 if error.is_nonce_too_low() {
332 } else {
334 best_txs.mark_invalid(
337 &pool_tx,
338 InvalidPoolTransactionError::Consensus(
339 InvalidTransactionError::TxTypeNotSupported,
340 ),
341 );
342 }
343 continue
344 }
345 Err(err) => return Err(Self::Error::from_eth_err(err)),
347 };
348
349 if let Some(tx_blob_gas) = tx.blob_gas_used() {
351 sum_blob_gas_used += tx_blob_gas;
352
353 if sum_blob_gas_used == blob_params.max_blob_gas_per_block() {
355 best_txs.skip_blobs();
356 }
357 }
358
359 cumulative_gas_used += gas_used;
362 }
363 }
364
365 let BlockBuilderOutcome { execution_result, block, hashed_state, .. } =
366 builder.finish(&state_provider).map_err(Self::Error::from_eth_err)?;
367
368 let execution_outcome = ExecutionOutcome::new(
369 db.take_bundle(),
370 vec![execution_result.receipts],
371 block.number(),
372 vec![execution_result.requests],
373 );
374
375 Ok(ExecutedBlock {
376 recovered_block: block.into(),
377 execution_output: Arc::new(execution_outcome),
378 hashed_state: Arc::new(hashed_state),
379 })
380 }
381}
382
383pub trait PendingEnvBuilder<Evm: ConfigureEvm>: Send + Sync + Unpin + 'static {
385 fn pending_env_attributes(
387 &self,
388 parent: &SealedHeader<HeaderTy<Evm::Primitives>>,
389 ) -> Result<Evm::NextBlockEnvCtx, EthApiError>;
390}
391
392pub trait BuildPendingEnv<Header> {
398 fn build_pending_env(parent: &SealedHeader<Header>) -> Self;
400}
401
402impl<Evm> PendingEnvBuilder<Evm> for ()
403where
404 Evm: ConfigureEvm<NextBlockEnvCtx: BuildPendingEnv<HeaderTy<Evm::Primitives>>>,
405{
406 fn pending_env_attributes(
407 &self,
408 parent: &SealedHeader<HeaderTy<Evm::Primitives>>,
409 ) -> Result<Evm::NextBlockEnvCtx, EthApiError> {
410 Ok(Evm::NextBlockEnvCtx::build_pending_env(parent))
411 }
412}
413
414impl<H: BlockHeader> BuildPendingEnv<H> for NextBlockEnvAttributes {
415 fn build_pending_env(parent: &SealedHeader<H>) -> Self {
416 Self {
417 timestamp: parent.timestamp().saturating_add(12),
418 suggested_fee_recipient: parent.beneficiary(),
419 prev_randao: B256::random(),
420 gas_limit: parent.gas_limit(),
421 parent_beacon_block_root: parent.parent_beacon_block_root().map(|_| B256::ZERO),
422 withdrawals: parent.withdrawals_root().map(|_| Default::default()),
423 }
424 }
425}