reth_optimism_rpc/eth/
ext.rs
1use crate::{error::TxConditionalErr, OpEthApiError, SequencerClient};
4use alloy_consensus::BlockHeader;
5use alloy_eips::BlockNumberOrTag;
6use alloy_primitives::{Bytes, StorageKey, B256, U256};
7use alloy_rpc_types_eth::erc4337::{AccountStorage, TransactionConditional};
8use jsonrpsee_core::RpcResult;
9use reth_optimism_txpool::conditional::MaybeConditionalTransaction;
10use reth_rpc_eth_api::L2EthApiExtServer;
11use reth_rpc_eth_types::utils::recover_raw_transaction;
12use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
13use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool};
14use std::sync::Arc;
15use tokio::sync::Semaphore;
16
17const MAX_CONDITIONAL_EXECUTION_COST: u64 = 5000;
19
20const MAX_CONCURRENT_CONDITIONAL_VALIDATIONS: usize = 3;
21
22#[derive(Clone, Debug)]
26pub struct OpEthExtApi<Pool, Provider> {
27 sequencer_client: Option<SequencerClient>,
30 inner: Arc<OpEthExtApiInner<Pool, Provider>>,
31}
32
33impl<Pool, Provider> OpEthExtApi<Pool, Provider>
34where
35 Provider: BlockReaderIdExt + StateProviderFactory + Clone + 'static,
36{
37 pub fn new(sequencer_client: Option<SequencerClient>, pool: Pool, provider: Provider) -> Self {
39 let inner = Arc::new(OpEthExtApiInner::new(pool, provider));
40 Self { sequencer_client, inner }
41 }
42
43 const fn sequencer_client(&self) -> Option<&SequencerClient> {
45 self.sequencer_client.as_ref()
46 }
47
48 #[inline]
49 fn pool(&self) -> &Pool {
50 self.inner.pool()
51 }
52
53 #[inline]
54 fn provider(&self) -> &Provider {
55 self.inner.provider()
56 }
57
58 async fn validate_known_accounts(
60 &self,
61 condition: &TransactionConditional,
62 ) -> Result<(), TxConditionalErr> {
63 if condition.known_accounts.is_empty() {
64 return Ok(());
65 }
66
67 let _permit =
68 self.inner.validation_semaphore.acquire().await.map_err(TxConditionalErr::internal)?;
69
70 let state = self
71 .provider()
72 .state_by_block_number_or_tag(BlockNumberOrTag::Latest)
73 .map_err(TxConditionalErr::internal)?;
74
75 for (address, storage) in &condition.known_accounts {
76 match storage {
77 AccountStorage::Slots(slots) => {
78 for (slot, expected_value) in slots {
79 let current = state
80 .storage(*address, StorageKey::from(*slot))
81 .map_err(TxConditionalErr::internal)?
82 .unwrap_or_default();
83
84 if current != U256::from_be_bytes(**expected_value) {
85 return Err(TxConditionalErr::StorageValueMismatch);
86 }
87 }
88 }
89 AccountStorage::RootHash(expected_root) => {
90 let actual_root = state
91 .storage_root(*address, Default::default())
92 .map_err(TxConditionalErr::internal)?;
93
94 if *expected_root != actual_root {
95 return Err(TxConditionalErr::StorageRootMismatch);
96 }
97 }
98 }
99 }
100
101 Ok(())
102 }
103}
104
105#[async_trait::async_trait]
106impl<Pool, Provider> L2EthApiExtServer for OpEthExtApi<Pool, Provider>
107where
108 Provider: BlockReaderIdExt + StateProviderFactory + Clone + 'static,
109 Pool: TransactionPool<Transaction: MaybeConditionalTransaction> + 'static,
110{
111 async fn send_raw_transaction_conditional(
112 &self,
113 bytes: Bytes,
114 condition: TransactionConditional,
115 ) -> RpcResult<B256> {
116 let cost = condition.cost();
118 if cost > MAX_CONDITIONAL_EXECUTION_COST {
119 return Err(TxConditionalErr::ConditionalCostExceeded.into());
120 }
121
122 let recovered_tx = recover_raw_transaction(&bytes).map_err(|_| {
123 OpEthApiError::Eth(reth_rpc_eth_types::EthApiError::FailedToDecodeSignedTransaction)
124 })?;
125
126 let mut tx = <Pool as TransactionPool>::Transaction::from_pooled(recovered_tx);
127
128 let header_not_found = || {
130 OpEthApiError::Eth(reth_rpc_eth_types::EthApiError::HeaderNotFound(
131 alloy_eips::BlockId::Number(BlockNumberOrTag::Latest),
132 ))
133 };
134 let header = self
135 .provider()
136 .latest_header()
137 .map_err(|_| header_not_found())?
138 .ok_or_else(header_not_found)?;
139
140 if condition.has_exceeded_block_number(header.header().number()) ||
142 condition.has_exceeded_timestamp(header.header().timestamp())
143 {
144 return Err(TxConditionalErr::InvalidCondition.into());
145 }
146
147 self.validate_known_accounts(&condition).await?;
149
150 if let Some(sequencer) = self.sequencer_client() {
151 let _ = sequencer
153 .forward_raw_transaction_conditional(bytes.as_ref(), condition)
154 .await
155 .map_err(OpEthApiError::Sequencer)?;
156 Ok(*tx.hash())
157 } else {
158 tx.set_conditional(condition);
160 let hash =
161 self.pool().add_transaction(TransactionOrigin::Private, tx).await.map_err(|e| {
162 OpEthApiError::Eth(reth_rpc_eth_types::EthApiError::PoolError(e.into()))
163 })?;
164
165 Ok(hash)
166 }
167 }
168}
169
170#[derive(Debug)]
171struct OpEthExtApiInner<Pool, Provider> {
172 pool: Pool,
174 provider: Provider,
176 validation_semaphore: Semaphore,
178}
179
180impl<Pool, Provider> OpEthExtApiInner<Pool, Provider> {
181 fn new(pool: Pool, provider: Provider) -> Self {
182 Self {
183 pool,
184 provider,
185 validation_semaphore: Semaphore::new(MAX_CONCURRENT_CONDITIONAL_VALIDATIONS),
186 }
187 }
188
189 #[inline]
190 const fn pool(&self) -> &Pool {
191 &self.pool
192 }
193
194 #[inline]
195 const fn provider(&self) -> &Provider {
196 &self.provider
197 }
198}