reth_rpc_eth_api/helpers/
pending_block.rs1use super::SpawnBlocking;
5use crate::{EthApiTypes, FromEthApiError, FromEvmError, RpcNodeCore};
6use alloy_consensus::{BlockHeader, Transaction};
7use alloy_eips::eip7840::BlobParams;
8use alloy_primitives::{B256, U256};
9use alloy_rpc_types_eth::BlockNumberOrTag;
10use futures::Future;
11use reth_chain_state::{BlockState, ComputedTrieData, ExecutedBlock};
12use reth_chainspec::{ChainSpecProvider, EthChainSpec};
13use reth_errors::{BlockExecutionError, BlockValidationError, ProviderError, RethError};
14use reth_evm::{
15 execute::{BlockBuilder, BlockBuilderOutcome, BlockExecutionOutput},
16 ConfigureEvm, Evm, EvmEnvFor, NextBlockEnvAttributes,
17};
18use reth_primitives_traits::{transaction::error::InvalidTransactionError, HeaderTy, SealedHeader};
19use reth_revm::{database::StateProviderDatabase, db::State};
20use reth_rpc_convert::RpcConvert;
21use reth_rpc_eth_types::{
22 block::BlockAndReceipts, builder::config::PendingBlockKind, EthApiError, PendingBlock,
23 PendingBlockEnv, PendingBlockEnvOrigin,
24};
25use reth_storage_api::{
26 noop::NoopProvider, BlockReader, BlockReaderIdExt, ProviderHeader, ProviderTx,
27 StateProviderBox, StateProviderFactory,
28};
29use reth_transaction_pool::{
30 error::InvalidPoolTransactionError, BestTransactions, BestTransactionsAttributes,
31 PoolTransaction, TransactionPool,
32};
33use revm::context_interface::Block;
34use std::{
35 sync::Arc,
36 time::{Duration, Instant},
37};
38use tokio::sync::Mutex;
39use tracing::debug;
40
41pub trait LoadPendingBlock:
45 EthApiTypes<
46 Error: FromEvmError<Self::Evm>,
47 RpcConvert: RpcConvert<Network = Self::NetworkTypes>,
48 > + RpcNodeCore
49{
50 fn pending_block(&self) -> &Mutex<Option<PendingBlock<Self::Primitives>>>;
54
55 fn pending_env_builder(&self) -> &dyn PendingEnvBuilder<Self::Evm>;
57
58 fn pending_block_kind(&self) -> PendingBlockKind;
60
61 fn pending_block_env_and_cfg(&self) -> Result<PendingBlockEnv<Self::Evm>, Self::Error> {
65 if let Some((block, receipts)) =
66 self.provider().pending_block_and_receipts().map_err(Self::Error::from_eth_err)?
67 {
68 let evm_env = self
72 .evm_config()
73 .evm_env(block.header())
74 .map_err(RethError::other)
75 .map_err(Self::Error::from_eth_err)?;
76
77 return Ok(PendingBlockEnv::new(
78 evm_env,
79 PendingBlockEnvOrigin::ActualPending(Arc::new(block), Arc::new(receipts)),
80 ));
81 }
82
83 let latest = self
86 .provider()
87 .latest_header()
88 .map_err(Self::Error::from_eth_err)?
89 .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
90
91 let evm_env = self
92 .evm_config()
93 .next_evm_env(&latest, &self.next_env_attributes(&latest)?)
94 .map_err(RethError::other)
95 .map_err(Self::Error::from_eth_err)?;
96
97 Ok(PendingBlockEnv::new(evm_env, PendingBlockEnvOrigin::DerivedFromLatest(latest)))
98 }
99
100 fn next_env_attributes(
102 &self,
103 parent: &SealedHeader<ProviderHeader<Self::Provider>>,
104 ) -> Result<<Self::Evm as ConfigureEvm>::NextBlockEnvCtx, Self::Error> {
105 Ok(self.pending_env_builder().pending_env_attributes(parent)?)
106 }
107
108 fn local_pending_state(
110 &self,
111 ) -> impl Future<Output = Result<Option<StateProviderBox>, Self::Error>> + Send
112 where
113 Self: SpawnBlocking,
114 {
115 async move {
116 let Some(pending_block) = self.pool_pending_block().await? else {
117 return Ok(None);
118 };
119
120 let latest_historical = self
121 .provider()
122 .history_by_block_hash(pending_block.block().parent_hash())
123 .map_err(Self::Error::from_eth_err)?;
124
125 let state = BlockState::from(pending_block);
126
127 Ok(Some(Box::new(state.state_provider(latest_historical)) as StateProviderBox))
128 }
129 }
130
131 fn pool_pending_block(
133 &self,
134 ) -> impl Future<Output = Result<Option<PendingBlock<Self::Primitives>>, Self::Error>> + Send
135 where
136 Self: SpawnBlocking,
137 {
138 async move {
139 if self.pending_block_kind().is_none() {
140 return Ok(None);
141 }
142 let pending = self.pending_block_env_and_cfg()?;
143 let parent = match pending.origin {
144 PendingBlockEnvOrigin::ActualPending(..) => return Ok(None),
145 PendingBlockEnvOrigin::DerivedFromLatest(parent) => parent,
146 };
147
148 self.build_pool_pending_block(parent, pending.evm_env).await
149 }
150 }
151
152 fn build_pool_pending_block(
157 &self,
158 parent: SealedHeader<ProviderHeader<Self::Provider>>,
159 evm_env: EvmEnvFor<Self::Evm>,
160 ) -> impl Future<Output = Result<Option<PendingBlock<Self::Primitives>>, Self::Error>> + Send
161 where
162 Self: SpawnBlocking,
163 {
164 async move {
165 let mut lock = self.pending_block().lock().await;
167
168 let now = Instant::now();
169
170 if let Some(pending_block) = lock.as_ref() {
172 if evm_env.block_env.number() == U256::from(pending_block.block().number()) &&
174 parent.hash() == pending_block.block().parent_hash() &&
175 now <= pending_block.expires_at
176 {
177 return Ok(Some(pending_block.clone()));
178 }
179 }
180
181 let executed_block = match self
182 .spawn_blocking_io(move |this| {
183 this.build_block(&parent)
185 })
186 .await
187 {
188 Ok(block) => block,
189 Err(err) => {
190 debug!(target: "rpc", "Failed to build pending block: {:?}", err);
191 return Ok(None)
192 }
193 };
194
195 let pending = PendingBlock::with_executed_block(
196 Instant::now() + Duration::from_secs(1),
197 executed_block,
198 );
199
200 *lock = Some(pending.clone());
201
202 Ok(Some(pending))
203 }
204 }
205
206 fn local_pending_block(
208 &self,
209 ) -> impl Future<Output = Result<Option<BlockAndReceipts<Self::Primitives>>, Self::Error>> + Send
210 where
211 Self: SpawnBlocking,
212 Self::Pool:
213 TransactionPool<Transaction: PoolTransaction<Consensus = ProviderTx<Self::Provider>>>,
214 {
215 async move {
216 if self.pending_block_kind().is_none() {
217 return Ok(None);
218 }
219
220 let pending = self.pending_block_env_and_cfg()?;
221
222 Ok(match pending.origin {
223 PendingBlockEnvOrigin::ActualPending(block, receipts) => {
224 Some(BlockAndReceipts { block, receipts })
225 }
226 PendingBlockEnvOrigin::DerivedFromLatest(parent) => self
227 .build_pool_pending_block(parent, pending.evm_env)
228 .await?
229 .map(PendingBlock::into_block_and_receipts),
230 })
231 }
232 }
233
234 fn build_block(
242 &self,
243 parent: &SealedHeader<ProviderHeader<Self::Provider>>,
244 ) -> Result<ExecutedBlock<Self::Primitives>, Self::Error>
245 where
246 Self::Pool:
247 TransactionPool<Transaction: PoolTransaction<Consensus = ProviderTx<Self::Provider>>>,
248 EthApiError: From<ProviderError>,
249 {
250 let state_provider = self
251 .provider()
252 .history_by_block_hash(parent.hash())
253 .map_err(Self::Error::from_eth_err)?;
254 let state = StateProviderDatabase::new(state_provider);
255 let mut db = State::builder().with_database(state).with_bundle_update().build();
256
257 let mut builder = self
258 .evm_config()
259 .builder_for_next_block(&mut db, parent, self.next_env_attributes(parent)?)
260 .map_err(RethError::other)
261 .map_err(Self::Error::from_eth_err)?;
262
263 builder.apply_pre_execution_changes().map_err(Self::Error::from_eth_err)?;
264
265 let block_gas_limit: u64 = builder.evm().block().gas_limit();
266 let basefee = builder.evm().block().basefee();
267 let blob_gasprice = builder.evm().block().blob_gasprice().map(|p| p as u64);
268
269 let blob_params = self
270 .provider()
271 .chain_spec()
272 .blob_params_at_timestamp(parent.timestamp())
273 .unwrap_or_else(BlobParams::cancun);
274 let mut cumulative_gas_used = 0;
275 let mut sum_blob_gas_used = 0;
276
277 if !self.pending_block_kind().is_empty() {
279 let mut best_txs = self
280 .pool()
281 .best_transactions_with_attributes(BestTransactionsAttributes::new(
282 basefee,
283 blob_gasprice,
284 ))
285 .without_updates();
287
288 while let Some(pool_tx) = best_txs.next() {
289 if cumulative_gas_used + pool_tx.gas_limit() > block_gas_limit {
291 best_txs.mark_invalid(
295 &pool_tx,
296 &InvalidPoolTransactionError::ExceedsGasLimit(
297 pool_tx.gas_limit(),
298 block_gas_limit,
299 ),
300 );
301 continue
302 }
303
304 if pool_tx.origin.is_private() {
305 best_txs.mark_invalid(
309 &pool_tx,
310 &InvalidPoolTransactionError::Consensus(
311 InvalidTransactionError::TxTypeNotSupported,
312 ),
313 );
314 continue
315 }
316
317 let tx = pool_tx.to_consensus();
319
320 let tx_blob_gas = tx.blob_gas_used();
323 if let Some(tx_blob_gas) = tx_blob_gas &&
324 sum_blob_gas_used + tx_blob_gas > blob_params.max_blob_gas_per_block()
325 {
326 best_txs.mark_invalid(
331 &pool_tx,
332 &InvalidPoolTransactionError::ExceedsGasLimit(
333 tx_blob_gas,
334 blob_params.max_blob_gas_per_block(),
335 ),
336 );
337 continue
338 }
339
340 let gas_used = match builder.execute_transaction(tx) {
341 Ok(gas_used) => gas_used,
342 Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
343 error,
344 ..
345 })) => {
346 if error.is_nonce_too_low() {
347 } else {
349 best_txs.mark_invalid(
352 &pool_tx,
353 &InvalidPoolTransactionError::Consensus(
354 InvalidTransactionError::TxTypeNotSupported,
355 ),
356 );
357 }
358 continue
359 }
360 Err(err) => return Err(Self::Error::from_eth_err(err)),
362 };
363
364 if let Some(tx_blob_gas) = tx_blob_gas {
366 sum_blob_gas_used += tx_blob_gas;
367
368 if sum_blob_gas_used == blob_params.max_blob_gas_per_block() {
370 best_txs.skip_blobs();
371 }
372 }
373
374 cumulative_gas_used += gas_used;
377 }
378 }
379
380 let BlockBuilderOutcome { execution_result, block, hashed_state, trie_updates } =
381 builder.finish(NoopProvider::default()).map_err(Self::Error::from_eth_err)?;
382
383 let execution_outcome =
384 BlockExecutionOutput { state: db.take_bundle(), result: execution_result };
385
386 Ok(ExecutedBlock::new(
387 block.into(),
388 Arc::new(execution_outcome),
389 ComputedTrieData::without_trie_input(
390 Arc::new(hashed_state.into_sorted()),
391 Arc::new(trie_updates.into_sorted()),
392 ),
393 ))
394 }
395}
396
397pub trait PendingEnvBuilder<Evm: ConfigureEvm>: Send + Sync + Unpin + 'static {
399 fn pending_env_attributes(
401 &self,
402 parent: &SealedHeader<HeaderTy<Evm::Primitives>>,
403 ) -> Result<Evm::NextBlockEnvCtx, EthApiError>;
404}
405
406pub trait BuildPendingEnv<Header> {
412 fn build_pending_env(parent: &SealedHeader<Header>) -> Self;
414}
415
416impl<Evm> PendingEnvBuilder<Evm> for ()
417where
418 Evm: ConfigureEvm<NextBlockEnvCtx: BuildPendingEnv<HeaderTy<Evm::Primitives>>>,
419{
420 fn pending_env_attributes(
421 &self,
422 parent: &SealedHeader<HeaderTy<Evm::Primitives>>,
423 ) -> Result<Evm::NextBlockEnvCtx, EthApiError> {
424 Ok(Evm::NextBlockEnvCtx::build_pending_env(parent))
425 }
426}
427
428impl<H: BlockHeader> BuildPendingEnv<H> for NextBlockEnvAttributes {
429 fn build_pending_env(parent: &SealedHeader<H>) -> Self {
430 Self {
431 timestamp: parent.timestamp().saturating_add(12),
432 suggested_fee_recipient: parent.beneficiary(),
433 prev_randao: B256::random(),
434 gas_limit: parent.gas_limit(),
435 parent_beacon_block_root: parent.parent_beacon_block_root(),
436 withdrawals: parent.withdrawals_root().map(|_| Default::default()),
437 extra_data: parent.extra_data().clone(),
438 }
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445 use alloy_consensus::Header;
446 use alloy_primitives::B256;
447 use reth_primitives_traits::SealedHeader;
448
449 #[test]
450 fn pending_env_keeps_parent_beacon_root() {
451 let mut header = Header::default();
452 let beacon_root = B256::repeat_byte(0x42);
453 header.parent_beacon_block_root = Some(beacon_root);
454 let sealed = SealedHeader::new(header, B256::ZERO);
455
456 let attrs = NextBlockEnvAttributes::build_pending_env(&sealed);
457
458 assert_eq!(attrs.parent_beacon_block_root, Some(beacon_root));
459 }
460}