reth_rpc_eth_api/helpers/
pending_block.rs1use super::SpawnBlocking;
5use crate::{EthApiTypes, FromEthApiError, FromEvmError, RpcNodeCore};
6use alloy_consensus::{BlockHeader, Transaction};
7use alloy_eips::eip7840::BlobParams;
8use alloy_primitives::{B256, U256};
9use alloy_rpc_types_eth::BlockNumberOrTag;
10use futures::Future;
11use reth_chain_state::{BlockState, ComputedTrieData, ExecutedBlock};
12use reth_chainspec::{ChainSpecProvider, EthChainSpec};
13use reth_errors::{BlockExecutionError, BlockValidationError, ProviderError, RethError};
14use reth_evm::{
15 execute::{BlockBuilder, BlockBuilderOutcome, BlockExecutionOutput},
16 ConfigureEvm, Evm, NextBlockEnvAttributes,
17};
18use reth_primitives_traits::{transaction::error::InvalidTransactionError, HeaderTy, SealedHeader};
19use reth_revm::{database::StateProviderDatabase, db::State};
20use reth_rpc_convert::RpcConvert;
21use reth_rpc_eth_types::{
22 block::BlockAndReceipts, builder::config::PendingBlockKind, EthApiError, PendingBlock,
23 PendingBlockEnv, PendingBlockEnvOrigin,
24};
25use reth_storage_api::{
26 noop::NoopProvider, BlockReader, BlockReaderIdExt, ProviderHeader, ProviderTx, ReceiptProvider,
27 StateProviderBox, StateProviderFactory,
28};
29use reth_transaction_pool::{
30 error::InvalidPoolTransactionError, BestTransactions, BestTransactionsAttributes,
31 PoolTransaction, TransactionPool,
32};
33use revm::context_interface::Block;
34use std::{
35 sync::Arc,
36 time::{Duration, Instant},
37};
38use tokio::sync::Mutex;
39use tracing::debug;
40
41pub trait LoadPendingBlock:
45 EthApiTypes<
46 Error: FromEvmError<Self::Evm>,
47 RpcConvert: RpcConvert<Network = Self::NetworkTypes>,
48 > + RpcNodeCore
49{
50 fn pending_block(&self) -> &Mutex<Option<PendingBlock<Self::Primitives>>>;
54
55 fn pending_env_builder(&self) -> &dyn PendingEnvBuilder<Self::Evm>;
57
58 fn pending_block_kind(&self) -> PendingBlockKind;
60
61 fn pending_block_env_and_cfg(&self) -> Result<PendingBlockEnv<Self::Evm>, Self::Error> {
65 if let Some(block) = self.provider().pending_block().map_err(Self::Error::from_eth_err)? &&
66 let Some(receipts) = self
67 .provider()
68 .receipts_by_block(block.hash().into())
69 .map_err(Self::Error::from_eth_err)?
70 {
71 let evm_env = self
75 .evm_config()
76 .evm_env(block.header())
77 .map_err(RethError::other)
78 .map_err(Self::Error::from_eth_err)?;
79
80 return Ok(PendingBlockEnv::new(
81 evm_env,
82 PendingBlockEnvOrigin::ActualPending(Arc::new(block), Arc::new(receipts)),
83 ));
84 }
85
86 let latest = self
89 .provider()
90 .latest_header()
91 .map_err(Self::Error::from_eth_err)?
92 .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
93
94 let evm_env = self
95 .evm_config()
96 .next_evm_env(&latest, &self.next_env_attributes(&latest)?)
97 .map_err(RethError::other)
98 .map_err(Self::Error::from_eth_err)?;
99
100 Ok(PendingBlockEnv::new(evm_env, PendingBlockEnvOrigin::DerivedFromLatest(latest)))
101 }
102
103 fn next_env_attributes(
105 &self,
106 parent: &SealedHeader<ProviderHeader<Self::Provider>>,
107 ) -> Result<<Self::Evm as ConfigureEvm>::NextBlockEnvCtx, Self::Error> {
108 Ok(self.pending_env_builder().pending_env_attributes(parent)?)
109 }
110
111 fn local_pending_state(
113 &self,
114 ) -> impl Future<Output = Result<Option<StateProviderBox>, Self::Error>> + Send
115 where
116 Self: SpawnBlocking,
117 {
118 async move {
119 let Some(pending_block) = self.pool_pending_block().await? else {
120 return Ok(None);
121 };
122
123 let latest_historical = self
124 .provider()
125 .history_by_block_hash(pending_block.block().parent_hash())
126 .map_err(Self::Error::from_eth_err)?;
127
128 let state = BlockState::from(pending_block);
129
130 Ok(Some(Box::new(state.state_provider(latest_historical)) as StateProviderBox))
131 }
132 }
133
134 fn pool_pending_block(
136 &self,
137 ) -> impl Future<Output = Result<Option<PendingBlock<Self::Primitives>>, Self::Error>> + Send
138 where
139 Self: SpawnBlocking,
140 {
141 async move {
142 if self.pending_block_kind().is_none() {
143 return Ok(None);
144 }
145 let pending = self.pending_block_env_and_cfg()?;
146 let parent = match pending.origin {
147 PendingBlockEnvOrigin::ActualPending(..) => return Ok(None),
148 PendingBlockEnvOrigin::DerivedFromLatest(parent) => parent,
149 };
150
151 let mut lock = self.pending_block().lock().await;
153
154 let now = Instant::now();
155
156 if let Some(pending_block) = lock.as_ref() {
158 if pending.evm_env.block_env.number() == U256::from(pending_block.block().number()) &&
160 parent.hash() == pending_block.block().parent_hash() &&
161 now <= pending_block.expires_at
162 {
163 return Ok(Some(pending_block.clone()));
164 }
165 }
166
167 let executed_block = match self
168 .spawn_blocking_io(move |this| {
169 this.build_block(&parent)
171 })
172 .await
173 {
174 Ok(block) => block,
175 Err(err) => {
176 debug!(target: "rpc", "Failed to build pending block: {:?}", err);
177 return Ok(None)
178 }
179 };
180
181 let pending = PendingBlock::with_executed_block(
182 Instant::now() + Duration::from_secs(1),
183 executed_block,
184 );
185
186 *lock = Some(pending.clone());
187
188 Ok(Some(pending))
189 }
190 }
191
192 fn local_pending_block(
194 &self,
195 ) -> impl Future<Output = Result<Option<BlockAndReceipts<Self::Primitives>>, Self::Error>> + Send
196 where
197 Self: SpawnBlocking,
198 Self::Pool:
199 TransactionPool<Transaction: PoolTransaction<Consensus = ProviderTx<Self::Provider>>>,
200 {
201 async move {
202 if self.pending_block_kind().is_none() {
203 return Ok(None);
204 }
205
206 let pending = self.pending_block_env_and_cfg()?;
207
208 Ok(match pending.origin {
209 PendingBlockEnvOrigin::ActualPending(block, receipts) => {
210 Some(BlockAndReceipts { block, receipts })
211 }
212 PendingBlockEnvOrigin::DerivedFromLatest(..) => {
213 self.pool_pending_block().await?.map(PendingBlock::into_block_and_receipts)
214 }
215 })
216 }
217 }
218
219 fn build_block(
227 &self,
228 parent: &SealedHeader<ProviderHeader<Self::Provider>>,
229 ) -> Result<ExecutedBlock<Self::Primitives>, Self::Error>
230 where
231 Self::Pool:
232 TransactionPool<Transaction: PoolTransaction<Consensus = ProviderTx<Self::Provider>>>,
233 EthApiError: From<ProviderError>,
234 {
235 let state_provider = self
236 .provider()
237 .history_by_block_hash(parent.hash())
238 .map_err(Self::Error::from_eth_err)?;
239 let state = StateProviderDatabase::new(state_provider);
240 let mut db = State::builder().with_database(state).with_bundle_update().build();
241
242 let mut builder = self
243 .evm_config()
244 .builder_for_next_block(&mut db, parent, self.next_env_attributes(parent)?)
245 .map_err(RethError::other)
246 .map_err(Self::Error::from_eth_err)?;
247
248 builder.apply_pre_execution_changes().map_err(Self::Error::from_eth_err)?;
249
250 let block_env = builder.evm_mut().block().clone();
251
252 let blob_params = self
253 .provider()
254 .chain_spec()
255 .blob_params_at_timestamp(parent.timestamp())
256 .unwrap_or_else(BlobParams::cancun);
257 let mut cumulative_gas_used = 0;
258 let mut sum_blob_gas_used = 0;
259 let block_gas_limit: u64 = block_env.gas_limit();
260
261 if !self.pending_block_kind().is_empty() {
263 let mut best_txs = self
264 .pool()
265 .best_transactions_with_attributes(BestTransactionsAttributes::new(
266 block_env.basefee(),
267 block_env.blob_gasprice().map(|gasprice| gasprice as u64),
268 ))
269 .without_updates();
271
272 while let Some(pool_tx) = best_txs.next() {
273 if cumulative_gas_used + pool_tx.gas_limit() > block_gas_limit {
275 best_txs.mark_invalid(
279 &pool_tx,
280 &InvalidPoolTransactionError::ExceedsGasLimit(
281 pool_tx.gas_limit(),
282 block_gas_limit,
283 ),
284 );
285 continue
286 }
287
288 if pool_tx.origin.is_private() {
289 best_txs.mark_invalid(
293 &pool_tx,
294 &InvalidPoolTransactionError::Consensus(
295 InvalidTransactionError::TxTypeNotSupported,
296 ),
297 );
298 continue
299 }
300
301 let tx = pool_tx.to_consensus();
303
304 if let Some(tx_blob_gas) = tx.blob_gas_used() &&
307 sum_blob_gas_used + tx_blob_gas > blob_params.max_blob_gas_per_block()
308 {
309 best_txs.mark_invalid(
314 &pool_tx,
315 &InvalidPoolTransactionError::ExceedsGasLimit(
316 tx_blob_gas,
317 blob_params.max_blob_gas_per_block(),
318 ),
319 );
320 continue
321 }
322
323 let gas_used = match builder.execute_transaction(tx.clone()) {
324 Ok(gas_used) => gas_used,
325 Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
326 error,
327 ..
328 })) => {
329 if error.is_nonce_too_low() {
330 } else {
332 best_txs.mark_invalid(
335 &pool_tx,
336 &InvalidPoolTransactionError::Consensus(
337 InvalidTransactionError::TxTypeNotSupported,
338 ),
339 );
340 }
341 continue
342 }
343 Err(err) => return Err(Self::Error::from_eth_err(err)),
345 };
346
347 if let Some(tx_blob_gas) = tx.blob_gas_used() {
349 sum_blob_gas_used += tx_blob_gas;
350
351 if sum_blob_gas_used == blob_params.max_blob_gas_per_block() {
353 best_txs.skip_blobs();
354 }
355 }
356
357 cumulative_gas_used += gas_used;
360 }
361 }
362
363 let BlockBuilderOutcome { execution_result, block, hashed_state, trie_updates } =
364 builder.finish(NoopProvider::default()).map_err(Self::Error::from_eth_err)?;
365
366 let execution_outcome =
367 BlockExecutionOutput { state: db.take_bundle(), result: execution_result };
368
369 Ok(ExecutedBlock::new(
370 block.into(),
371 Arc::new(execution_outcome),
372 ComputedTrieData::without_trie_input(
373 Arc::new(hashed_state.into_sorted()),
374 Arc::new(trie_updates.into_sorted()),
375 ),
376 ))
377 }
378}
379
380pub trait PendingEnvBuilder<Evm: ConfigureEvm>: Send + Sync + Unpin + 'static {
382 fn pending_env_attributes(
384 &self,
385 parent: &SealedHeader<HeaderTy<Evm::Primitives>>,
386 ) -> Result<Evm::NextBlockEnvCtx, EthApiError>;
387}
388
389pub trait BuildPendingEnv<Header> {
395 fn build_pending_env(parent: &SealedHeader<Header>) -> Self;
397}
398
399impl<Evm> PendingEnvBuilder<Evm> for ()
400where
401 Evm: ConfigureEvm<NextBlockEnvCtx: BuildPendingEnv<HeaderTy<Evm::Primitives>>>,
402{
403 fn pending_env_attributes(
404 &self,
405 parent: &SealedHeader<HeaderTy<Evm::Primitives>>,
406 ) -> Result<Evm::NextBlockEnvCtx, EthApiError> {
407 Ok(Evm::NextBlockEnvCtx::build_pending_env(parent))
408 }
409}
410
411impl<H: BlockHeader> BuildPendingEnv<H> for NextBlockEnvAttributes {
412 fn build_pending_env(parent: &SealedHeader<H>) -> Self {
413 Self {
414 timestamp: parent.timestamp().saturating_add(12),
415 suggested_fee_recipient: parent.beneficiary(),
416 prev_randao: B256::random(),
417 gas_limit: parent.gas_limit(),
418 parent_beacon_block_root: parent.parent_beacon_block_root(),
419 withdrawals: parent.withdrawals_root().map(|_| Default::default()),
420 extra_data: parent.extra_data().clone(),
421 }
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428 use alloy_consensus::Header;
429 use alloy_primitives::B256;
430 use reth_primitives_traits::SealedHeader;
431
432 #[test]
433 fn pending_env_keeps_parent_beacon_root() {
434 let mut header = Header::default();
435 let beacon_root = B256::repeat_byte(0x42);
436 header.parent_beacon_block_root = Some(beacon_root);
437 let sealed = SealedHeader::new(header, B256::ZERO);
438
439 let attrs = NextBlockEnvAttributes::build_pending_env(&sealed);
440
441 assert_eq!(attrs.parent_beacon_block_root, Some(beacon_root));
442 }
443}