reth_rpc_eth_api/helpers/
pending_block.rs1use super::SpawnBlocking;
5use crate::{EthApiTypes, FromEthApiError, FromEvmError, RpcNodeCore};
6use alloy_consensus::{BlockHeader, Transaction};
7use alloy_eips::eip7840::BlobParams;
8use alloy_primitives::{B256, U256};
9use alloy_rpc_types_eth::BlockNumberOrTag;
10use futures::Future;
11use reth_chain_state::{BlockState, ExecutedBlock};
12use reth_chainspec::{ChainSpecProvider, EthChainSpec};
13use reth_errors::{BlockExecutionError, BlockValidationError, ProviderError, RethError};
14use reth_evm::{
15 execute::{BlockBuilder, BlockBuilderOutcome, ExecutionOutcome},
16 ConfigureEvm, Evm, NextBlockEnvAttributes, SpecFor,
17};
18use reth_primitives_traits::{transaction::error::InvalidTransactionError, HeaderTy, SealedHeader};
19use reth_revm::{database::StateProviderDatabase, db::State};
20use reth_rpc_convert::RpcConvert;
21use reth_rpc_eth_types::{
22 block::BlockAndReceipts, builder::config::PendingBlockKind, EthApiError, PendingBlock,
23 PendingBlockEnv, PendingBlockEnvOrigin,
24};
25use reth_storage_api::{
26 noop::NoopProvider, BlockReader, BlockReaderIdExt, ProviderBlock, ProviderHeader,
27 ProviderReceipt, ProviderTx, ReceiptProvider, StateProviderBox, StateProviderFactory,
28};
29use reth_transaction_pool::{
30 error::InvalidPoolTransactionError, BestTransactions, BestTransactionsAttributes,
31 PoolTransaction, TransactionPool,
32};
33use revm::context_interface::Block;
34use std::{
35 sync::Arc,
36 time::{Duration, Instant},
37};
38use tokio::sync::Mutex;
39use tracing::debug;
40
41pub trait LoadPendingBlock:
45 EthApiTypes<
46 Error: FromEvmError<Self::Evm>,
47 RpcConvert: RpcConvert<Network = Self::NetworkTypes>,
48 > + RpcNodeCore
49{
50 fn pending_block(&self) -> &Mutex<Option<PendingBlock<Self::Primitives>>>;
54
55 fn pending_env_builder(&self) -> &dyn PendingEnvBuilder<Self::Evm>;
57
58 fn pending_block_kind(&self) -> PendingBlockKind;
60
61 #[expect(clippy::type_complexity)]
65 fn pending_block_env_and_cfg(
66 &self,
67 ) -> Result<
68 PendingBlockEnv<
69 ProviderBlock<Self::Provider>,
70 ProviderReceipt<Self::Provider>,
71 SpecFor<Self::Evm>,
72 >,
73 Self::Error,
74 > {
75 if let Some(block) = self.provider().pending_block().map_err(Self::Error::from_eth_err)? &&
76 let Some(receipts) = self
77 .provider()
78 .receipts_by_block(block.hash().into())
79 .map_err(Self::Error::from_eth_err)?
80 {
81 let evm_env = self
85 .evm_config()
86 .evm_env(block.header())
87 .map_err(RethError::other)
88 .map_err(Self::Error::from_eth_err)?;
89
90 return Ok(PendingBlockEnv::new(
91 evm_env,
92 PendingBlockEnvOrigin::ActualPending(Arc::new(block), Arc::new(receipts)),
93 ));
94 }
95
96 let latest = self
99 .provider()
100 .latest_header()
101 .map_err(Self::Error::from_eth_err)?
102 .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
103
104 let evm_env = self
105 .evm_config()
106 .next_evm_env(&latest, &self.next_env_attributes(&latest)?)
107 .map_err(RethError::other)
108 .map_err(Self::Error::from_eth_err)?;
109
110 Ok(PendingBlockEnv::new(evm_env, PendingBlockEnvOrigin::DerivedFromLatest(latest)))
111 }
112
113 fn next_env_attributes(
115 &self,
116 parent: &SealedHeader<ProviderHeader<Self::Provider>>,
117 ) -> Result<<Self::Evm as ConfigureEvm>::NextBlockEnvCtx, Self::Error> {
118 Ok(self.pending_env_builder().pending_env_attributes(parent)?)
119 }
120
121 fn local_pending_state(
123 &self,
124 ) -> impl Future<Output = Result<Option<StateProviderBox>, Self::Error>> + Send
125 where
126 Self: SpawnBlocking,
127 {
128 async move {
129 let Some(pending_block) = self.pool_pending_block().await? else {
130 return Ok(None);
131 };
132
133 let latest_historical = self
134 .provider()
135 .history_by_block_hash(pending_block.block().parent_hash())
136 .map_err(Self::Error::from_eth_err)?;
137
138 let state = BlockState::from(pending_block);
139
140 Ok(Some(Box::new(state.state_provider(latest_historical)) as StateProviderBox))
141 }
142 }
143
144 fn pool_pending_block(
146 &self,
147 ) -> impl Future<Output = Result<Option<PendingBlock<Self::Primitives>>, Self::Error>> + Send
148 where
149 Self: SpawnBlocking,
150 {
151 async move {
152 if self.pending_block_kind().is_none() {
153 return Ok(None);
154 }
155 let pending = self.pending_block_env_and_cfg()?;
156 let parent = match pending.origin {
157 PendingBlockEnvOrigin::ActualPending(..) => return Ok(None),
158 PendingBlockEnvOrigin::DerivedFromLatest(parent) => parent,
159 };
160
161 let mut lock = self.pending_block().lock().await;
163
164 let now = Instant::now();
165
166 if let Some(pending_block) = lock.as_ref() {
168 if pending.evm_env.block_env.number == U256::from(pending_block.block().number()) &&
170 parent.hash() == pending_block.block().parent_hash() &&
171 now <= pending_block.expires_at
172 {
173 return Ok(Some(pending_block.clone()));
174 }
175 }
176
177 let executed_block = match self
178 .spawn_blocking_io(move |this| {
179 this.build_block(&parent)
181 })
182 .await
183 {
184 Ok(block) => block,
185 Err(err) => {
186 debug!(target: "rpc", "Failed to build pending block: {:?}", err);
187 return Ok(None)
188 }
189 };
190
191 let pending = PendingBlock::with_executed_block(
192 Instant::now() + Duration::from_secs(1),
193 executed_block,
194 );
195
196 *lock = Some(pending.clone());
197
198 Ok(Some(pending))
199 }
200 }
201
202 fn local_pending_block(
204 &self,
205 ) -> impl Future<Output = Result<Option<BlockAndReceipts<Self::Primitives>>, Self::Error>> + Send
206 where
207 Self: SpawnBlocking,
208 Self::Pool:
209 TransactionPool<Transaction: PoolTransaction<Consensus = ProviderTx<Self::Provider>>>,
210 {
211 async move {
212 if self.pending_block_kind().is_none() {
213 return Ok(None);
214 }
215
216 let pending = self.pending_block_env_and_cfg()?;
217
218 Ok(match pending.origin {
219 PendingBlockEnvOrigin::ActualPending(block, receipts) => {
220 Some(BlockAndReceipts { block, receipts })
221 }
222 PendingBlockEnvOrigin::DerivedFromLatest(..) => {
223 self.pool_pending_block().await?.map(PendingBlock::into_block_and_receipts)
224 }
225 })
226 }
227 }
228
229 fn build_block(
236 &self,
237 parent: &SealedHeader<ProviderHeader<Self::Provider>>,
238 ) -> Result<ExecutedBlock<Self::Primitives>, Self::Error>
239 where
240 Self::Pool:
241 TransactionPool<Transaction: PoolTransaction<Consensus = ProviderTx<Self::Provider>>>,
242 EthApiError: From<ProviderError>,
243 {
244 let state_provider = self
245 .provider()
246 .history_by_block_hash(parent.hash())
247 .map_err(Self::Error::from_eth_err)?;
248 let state = StateProviderDatabase::new(&state_provider);
249 let mut db = State::builder().with_database(state).with_bundle_update().build();
250
251 let mut builder = self
252 .evm_config()
253 .builder_for_next_block(&mut db, parent, self.next_env_attributes(parent)?)
254 .map_err(RethError::other)
255 .map_err(Self::Error::from_eth_err)?;
256
257 builder.apply_pre_execution_changes().map_err(Self::Error::from_eth_err)?;
258
259 let block_env = builder.evm_mut().block().clone();
260
261 let blob_params = self
262 .provider()
263 .chain_spec()
264 .blob_params_at_timestamp(parent.timestamp())
265 .unwrap_or_else(BlobParams::cancun);
266 let mut cumulative_gas_used = 0;
267 let mut sum_blob_gas_used = 0;
268 let block_gas_limit: u64 = block_env.gas_limit;
269
270 if !self.pending_block_kind().is_empty() {
272 let mut best_txs = self
273 .pool()
274 .best_transactions_with_attributes(BestTransactionsAttributes::new(
275 block_env.basefee,
276 block_env.blob_gasprice().map(|gasprice| gasprice as u64),
277 ))
278 .without_updates();
280
281 while let Some(pool_tx) = best_txs.next() {
282 if cumulative_gas_used + pool_tx.gas_limit() > block_gas_limit {
284 best_txs.mark_invalid(
288 &pool_tx,
289 InvalidPoolTransactionError::ExceedsGasLimit(
290 pool_tx.gas_limit(),
291 block_gas_limit,
292 ),
293 );
294 continue
295 }
296
297 if pool_tx.origin.is_private() {
298 best_txs.mark_invalid(
302 &pool_tx,
303 InvalidPoolTransactionError::Consensus(
304 InvalidTransactionError::TxTypeNotSupported,
305 ),
306 );
307 continue
308 }
309
310 let tx = pool_tx.to_consensus();
312
313 if let Some(tx_blob_gas) = tx.blob_gas_used() &&
316 sum_blob_gas_used + tx_blob_gas > blob_params.max_blob_gas_per_block()
317 {
318 best_txs.mark_invalid(
323 &pool_tx,
324 InvalidPoolTransactionError::ExceedsGasLimit(
325 tx_blob_gas,
326 blob_params.max_blob_gas_per_block(),
327 ),
328 );
329 continue
330 }
331
332 let gas_used = match builder.execute_transaction(tx.clone()) {
333 Ok(gas_used) => gas_used,
334 Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
335 error,
336 ..
337 })) => {
338 if error.is_nonce_too_low() {
339 } else {
341 best_txs.mark_invalid(
344 &pool_tx,
345 InvalidPoolTransactionError::Consensus(
346 InvalidTransactionError::TxTypeNotSupported,
347 ),
348 );
349 }
350 continue
351 }
352 Err(err) => return Err(Self::Error::from_eth_err(err)),
354 };
355
356 if let Some(tx_blob_gas) = tx.blob_gas_used() {
358 sum_blob_gas_used += tx_blob_gas;
359
360 if sum_blob_gas_used == blob_params.max_blob_gas_per_block() {
362 best_txs.skip_blobs();
363 }
364 }
365
366 cumulative_gas_used += gas_used;
369 }
370 }
371
372 let BlockBuilderOutcome { execution_result, block, hashed_state, .. } =
373 builder.finish(NoopProvider::default()).map_err(Self::Error::from_eth_err)?;
374
375 let execution_outcome = ExecutionOutcome::new(
376 db.take_bundle(),
377 vec![execution_result.receipts],
378 block.number(),
379 vec![execution_result.requests],
380 );
381
382 Ok(ExecutedBlock {
383 recovered_block: block.into(),
384 execution_output: Arc::new(execution_outcome),
385 hashed_state: Arc::new(hashed_state),
386 })
387 }
388}
389
390pub trait PendingEnvBuilder<Evm: ConfigureEvm>: Send + Sync + Unpin + 'static {
392 fn pending_env_attributes(
394 &self,
395 parent: &SealedHeader<HeaderTy<Evm::Primitives>>,
396 ) -> Result<Evm::NextBlockEnvCtx, EthApiError>;
397}
398
399pub trait BuildPendingEnv<Header> {
405 fn build_pending_env(parent: &SealedHeader<Header>) -> Self;
407}
408
409impl<Evm> PendingEnvBuilder<Evm> for ()
410where
411 Evm: ConfigureEvm<NextBlockEnvCtx: BuildPendingEnv<HeaderTy<Evm::Primitives>>>,
412{
413 fn pending_env_attributes(
414 &self,
415 parent: &SealedHeader<HeaderTy<Evm::Primitives>>,
416 ) -> Result<Evm::NextBlockEnvCtx, EthApiError> {
417 Ok(Evm::NextBlockEnvCtx::build_pending_env(parent))
418 }
419}
420
421impl<H: BlockHeader> BuildPendingEnv<H> for NextBlockEnvAttributes {
422 fn build_pending_env(parent: &SealedHeader<H>) -> Self {
423 Self {
424 timestamp: parent.timestamp().saturating_add(12),
425 suggested_fee_recipient: parent.beneficiary(),
426 prev_randao: B256::random(),
427 gas_limit: parent.gas_limit(),
428 parent_beacon_block_root: parent.parent_beacon_block_root().map(|_| B256::ZERO),
429 withdrawals: parent.withdrawals_root().map(|_| Default::default()),
430 }
431 }
432}