reth_optimism_flashblocks/
service.rs1use crate::{sequence::FlashBlockSequence, ExecutionPayloadBaseV1, FlashBlock};
2use alloy_eips::BlockNumberOrTag;
3use alloy_primitives::B256;
4use futures_util::{FutureExt, Stream, StreamExt};
5use reth_chain_state::{
6 CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions, ExecutedBlock,
7};
8use reth_errors::RethError;
9use reth_evm::{
10 execute::{BlockBuilder, BlockBuilderOutcome},
11 ConfigureEvm,
12};
13use reth_execution_types::ExecutionOutcome;
14use reth_primitives_traits::{AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy};
15use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State};
16use reth_rpc_eth_types::{EthApiError, PendingBlock};
17use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory};
18use std::{
19 pin::Pin,
20 sync::Arc,
21 task::{Context, Poll},
22 time::{Duration, Instant},
23};
24use tokio::pin;
25use tracing::{debug, trace, warn};
26
27#[derive(Debug)]
30pub struct FlashBlockService<
31 N: NodePrimitives,
32 S,
33 EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: Unpin>,
34 Provider,
35> {
36 rx: S,
37 current: Option<PendingBlock<N>>,
38 blocks: FlashBlockSequence<N::SignedTx>,
39 rebuild: bool,
40 evm_config: EvmConfig,
41 provider: Provider,
42 canon_receiver: CanonStateNotifications<N>,
43 cached_state: Option<(B256, CachedReads)>,
48}
49
50impl<N, S, EvmConfig, Provider> FlashBlockService<N, S, EvmConfig, Provider>
51where
52 N: NodePrimitives,
53 S: Stream<Item = eyre::Result<FlashBlock>> + Unpin,
54 EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>,
55 Provider: StateProviderFactory
56 + CanonStateSubscriptions<Primitives = N>
57 + BlockReaderIdExt<
58 Header = HeaderTy<N>,
59 Block = BlockTy<N>,
60 Transaction = N::SignedTx,
61 Receipt = ReceiptTy<N>,
62 > + Unpin,
63{
64 pub fn new(rx: S, evm_config: EvmConfig, provider: Provider) -> Self {
66 Self {
67 rx,
68 current: None,
69 blocks: FlashBlockSequence::new(),
70 evm_config,
71 canon_receiver: provider.subscribe_to_canonical_state(),
72 provider,
73 cached_state: None,
74 rebuild: false,
75 }
76 }
77
78 pub async fn run(mut self, tx: tokio::sync::watch::Sender<Option<PendingBlock<N>>>) {
82 while let Some(block) = self.next().await {
83 if let Ok(block) = block.inspect_err(|e| tracing::error!("{e}")) {
84 let _ = tx.send(block).inspect_err(|e| tracing::error!("{e}"));
85 }
86 }
87
88 warn!("Flashblock service has stopped");
89 }
90
91 fn cached_reads(&mut self, head: B256) -> CachedReads {
95 if let Some((tracked, cache)) = self.cached_state.take() {
96 if tracked == head {
97 return cache
98 }
99 }
100
101 CachedReads::default()
103 }
104
105 fn update_cached_reads(&mut self, head: B256, cached_reads: CachedReads) {
107 self.cached_state = Some((head, cached_reads));
108 }
109
110 fn execute(&mut self) -> eyre::Result<Option<PendingBlock<N>>> {
114 trace!("Attempting new flashblock");
115
116 let latest = self
117 .provider
118 .latest_header()?
119 .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
120 let latest_hash = latest.hash();
121
122 let Some(attrs) = self.blocks.payload_base() else {
123 trace!(flashblock_number = ?self.blocks.block_number(), count = %self.blocks.count(), "Missing flashblock payload base");
124 return Ok(None)
125 };
126
127 if attrs.parent_hash != latest_hash {
128 trace!(flashblock_parent = ?attrs.parent_hash, local_latest=?latest.num_hash(),"Skipping non consecutive flashblock");
129 return Ok(None)
131 }
132
133 let state_provider = self.provider.history_by_block_hash(latest.hash())?;
134
135 let mut request_cache = self.cached_reads(latest_hash);
136 let cached_db = request_cache.as_db_mut(StateProviderDatabase::new(&state_provider));
137 let mut state = State::builder().with_database(cached_db).with_bundle_update().build();
138
139 let mut builder = self
140 .evm_config
141 .builder_for_next_block(&mut state, &latest, attrs.into())
142 .map_err(RethError::other)?;
143
144 builder.apply_pre_execution_changes()?;
145
146 for tx in self.blocks.ready_transactions() {
147 let _gas_used = builder.execute_transaction(tx)?;
148 }
149
150 let BlockBuilderOutcome { execution_result, block, hashed_state, .. } =
151 builder.finish(NoopProvider::default())?;
152
153 let execution_outcome = ExecutionOutcome::new(
154 state.take_bundle(),
155 vec![execution_result.receipts],
156 block.number(),
157 vec![execution_result.requests],
158 );
159
160 self.update_cached_reads(latest_hash, request_cache);
162
163 Ok(Some(PendingBlock::with_executed_block(
164 Instant::now() + Duration::from_secs(1),
165 ExecutedBlock {
166 recovered_block: block.into(),
167 execution_output: Arc::new(execution_outcome),
168 hashed_state: Arc::new(hashed_state),
169 },
170 )))
171 }
172
173 fn on_new_tip(&mut self, state: CanonStateNotification<N>) -> Option<PendingBlock<N>> {
175 let latest = state.tip_checked()?.hash();
176 self.current.take_if(|current| current.parent_hash() != latest)
177 }
178}
179
180impl<N, S, EvmConfig, Provider> Stream for FlashBlockService<N, S, EvmConfig, Provider>
181where
182 N: NodePrimitives,
183 S: Stream<Item = eyre::Result<FlashBlock>> + Unpin,
184 EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>,
185 Provider: StateProviderFactory
186 + CanonStateSubscriptions<Primitives = N>
187 + BlockReaderIdExt<
188 Header = HeaderTy<N>,
189 Block = BlockTy<N>,
190 Transaction = N::SignedTx,
191 Receipt = ReceiptTy<N>,
192 > + Unpin,
193{
194 type Item = eyre::Result<Option<PendingBlock<N>>>;
195
196 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
197 let this = self.get_mut();
198
199 while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) {
201 match result {
202 Ok(flashblock) => match this.blocks.insert(flashblock) {
203 Ok(_) => this.rebuild = true,
204 Err(err) => debug!(%err, "Failed to prepare flashblock"),
205 },
206 Err(err) => return Poll::Ready(Some(Err(err))),
207 }
208 }
209
210 if let Poll::Ready(Ok(state)) = {
211 let fut = this.canon_receiver.recv();
212 pin!(fut);
213 fut.poll_unpin(cx)
214 } {
215 if let Some(current) = this.on_new_tip(state) {
216 trace!(
217 parent_hash = %current.block().parent_hash(),
218 block_number = current.block().number(),
219 "Clearing current flashblock on new canonical block"
220 );
221
222 return Poll::Ready(Some(Ok(None)))
223 }
224 }
225
226 if !this.rebuild && this.current.is_some() {
227 return Poll::Pending
228 }
229
230 let now = Instant::now();
231 match this.execute() {
233 Ok(Some(new_pending)) => {
234 this.current = Some(new_pending.clone());
236 this.rebuild = false;
237 trace!(parent_hash=%new_pending.block().parent_hash(), block_number=new_pending.block().number(), flash_blocks=this.blocks.count(), elapsed=?now.elapsed(), "Built new block with flashblocks");
238 return Poll::Ready(Some(Ok(Some(new_pending))));
239 }
240 Ok(None) => {
241 }
243 Err(err) => {
244 debug!(%err, "failed to execute flashblock");
246 }
247 }
248
249 Poll::Pending
250 }
251}