reth_optimism_flashblocks/
service.rs1use crate::{
2 sequence::FlashBlockPendingSequence,
3 worker::{BuildArgs, FlashBlockBuilder},
4 ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx, InProgressFlashBlockRx,
5 PendingFlashBlock,
6};
7use alloy_eips::eip2718::WithEncoded;
8use alloy_primitives::B256;
9use futures_util::{FutureExt, Stream, StreamExt};
10use metrics::Histogram;
11use reth_chain_state::{CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions};
12use reth_evm::ConfigureEvm;
13use reth_metrics::Metrics;
14use reth_primitives_traits::{
15 AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, Recovered,
16};
17use reth_revm::cached::CachedReads;
18use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
19use reth_tasks::TaskExecutor;
20use std::{
21 pin::Pin,
22 task::{ready, Context, Poll},
23 time::Instant,
24};
25use tokio::{
26 pin,
27 sync::{oneshot, watch},
28};
29use tracing::{debug, trace, warn};
30
31pub(crate) const FB_STATE_ROOT_FROM_INDEX: usize = 9;
32
33#[derive(Debug)]
36pub struct FlashBlockService<
37 N: NodePrimitives,
38 S,
39 EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: Unpin>,
40 Provider,
41> {
42 rx: S,
43 current: Option<PendingFlashBlock<N>>,
44 blocks: FlashBlockPendingSequence<N::SignedTx>,
45 rebuild: bool,
46 builder: FlashBlockBuilder<EvmConfig, Provider>,
47 canon_receiver: CanonStateNotifications<N>,
48 spawner: TaskExecutor,
49 job: Option<BuildJob<N>>,
50 cached_state: Option<(B256, CachedReads)>,
55 in_progress_tx: watch::Sender<Option<FlashBlockBuildInfo>>,
57 metrics: FlashBlockServiceMetrics,
59 compute_state_root: bool,
61}
62
63#[derive(Debug, Clone, Copy)]
65pub struct FlashBlockBuildInfo {
66 pub parent_hash: B256,
68 pub index: u64,
70 pub block_number: u64,
72}
73
74impl<N, S, EvmConfig, Provider> FlashBlockService<N, S, EvmConfig, Provider>
75where
76 N: NodePrimitives,
77 S: Stream<Item = eyre::Result<FlashBlock>> + Unpin + 'static,
78 EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>
79 + Clone
80 + 'static,
81 Provider: StateProviderFactory
82 + CanonStateSubscriptions<Primitives = N>
83 + BlockReaderIdExt<
84 Header = HeaderTy<N>,
85 Block = BlockTy<N>,
86 Transaction = N::SignedTx,
87 Receipt = ReceiptTy<N>,
88 > + Unpin
89 + Clone
90 + 'static,
91{
92 pub fn new(rx: S, evm_config: EvmConfig, provider: Provider, spawner: TaskExecutor) -> Self {
94 let (in_progress_tx, _) = watch::channel(None);
95 Self {
96 rx,
97 current: None,
98 blocks: FlashBlockPendingSequence::new(),
99 canon_receiver: provider.subscribe_to_canonical_state(),
100 builder: FlashBlockBuilder::new(evm_config, provider),
101 rebuild: false,
102 spawner,
103 job: None,
104 cached_state: None,
105 in_progress_tx,
106 metrics: FlashBlockServiceMetrics::default(),
107 compute_state_root: false,
108 }
109 }
110
111 pub const fn compute_state_root(mut self, enable_state_root: bool) -> Self {
113 self.compute_state_root = enable_state_root;
114 self
115 }
116
117 pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx {
119 self.blocks.subscribe_block_sequence()
120 }
121
122 pub fn subscribe_in_progress(&self) -> InProgressFlashBlockRx {
124 self.in_progress_tx.subscribe()
125 }
126
127 pub async fn run(mut self, tx: tokio::sync::watch::Sender<Option<PendingFlashBlock<N>>>) {
131 while let Some(block) = self.next().await {
132 if let Ok(block) = block.inspect_err(|e| tracing::error!("{e}")) {
133 let _ = tx.send(block).inspect_err(|e| tracing::error!("{e}"));
134 }
135 }
136
137 warn!("Flashblock service has stopped");
138 }
139
140 fn build_args(
144 &mut self,
145 ) -> Option<
146 BuildArgs<
147 impl IntoIterator<Item = WithEncoded<Recovered<N::SignedTx>>>
148 + use<N, S, EvmConfig, Provider>,
149 >,
150 > {
151 let Some(base) = self.blocks.payload_base() else {
152 trace!(
153 flashblock_number = ?self.blocks.block_number(),
154 count = %self.blocks.count(),
155 "Missing flashblock payload base"
156 );
157
158 return None
159 };
160
161 if let Some(latest) = self.builder.provider().latest_header().ok().flatten() &&
163 latest.hash() != base.parent_hash
164 {
165 trace!(flashblock_parent=?base.parent_hash, flashblock_number=base.block_number, local_latest=?latest.num_hash(), "Skipping non consecutive build attempt");
166 return None
167 }
168
169 let Some(last_flashblock) = self.blocks.last_flashblock() else {
170 trace!(flashblock_number = ?self.blocks.block_number(), count = %self.blocks.count(), "Missing last flashblock");
171 return None
172 };
173
174 let compute_state_root =
176 self.compute_state_root && self.blocks.index() >= Some(FB_STATE_ROOT_FROM_INDEX as u64);
177
178 Some(BuildArgs {
179 base,
180 transactions: self.blocks.ready_transactions().collect::<Vec<_>>(),
181 cached_state: self.cached_state.take(),
182 last_flashblock_index: last_flashblock.index,
183 last_flashblock_hash: last_flashblock.diff.block_hash,
184 compute_state_root,
185 })
186 }
187
188 fn on_new_tip(&mut self, state: CanonStateNotification<N>) -> Option<PendingFlashBlock<N>> {
190 let tip = state.tip_checked()?;
191 let tip_hash = tip.hash();
192 let current = self.current.take_if(|current| current.parent_hash() != tip_hash);
193
194 let mut cached = CachedReads::default();
196 let committed = state.committed();
197 let new_execution_outcome = committed.execution_outcome();
198 for (addr, acc) in new_execution_outcome.bundle_accounts_iter() {
199 if let Some(info) = acc.info.clone() {
200 let storage =
202 acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect();
203 cached.insert_account(addr, info, storage);
204 }
205 }
206 self.cached_state = Some((tip_hash, cached));
207
208 current
209 }
210}
211
212impl<N, S, EvmConfig, Provider> Stream for FlashBlockService<N, S, EvmConfig, Provider>
213where
214 N: NodePrimitives,
215 S: Stream<Item = eyre::Result<FlashBlock>> + Unpin + 'static,
216 EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>
217 + Clone
218 + 'static,
219 Provider: StateProviderFactory
220 + CanonStateSubscriptions<Primitives = N>
221 + BlockReaderIdExt<
222 Header = HeaderTy<N>,
223 Block = BlockTy<N>,
224 Transaction = N::SignedTx,
225 Receipt = ReceiptTy<N>,
226 > + Unpin
227 + Clone
228 + 'static,
229{
230 type Item = eyre::Result<Option<PendingFlashBlock<N>>>;
231
232 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
233 let this = self.get_mut();
234
235 loop {
236 let result = match this.job.as_mut() {
238 Some((now, rx)) => {
239 let result = ready!(rx.poll_unpin(cx));
240 result.ok().map(|res| (*now, res))
241 }
242 None => None,
243 };
244 this.job.take();
246 let _ = this.in_progress_tx.send(None);
248
249 if let Some((now, result)) = result {
250 match result {
251 Ok(Some((new_pending, cached_reads))) => {
252 this.blocks.set_state_root(new_pending.computed_state_root());
254
255 this.current = Some(new_pending.clone());
257 this.cached_state = Some((new_pending.parent_hash(), cached_reads));
259 this.rebuild = false;
260
261 let elapsed = now.elapsed();
262 this.metrics.execution_duration.record(elapsed.as_secs_f64());
263 trace!(
264 parent_hash = %new_pending.block().parent_hash(),
265 block_number = new_pending.block().number(),
266 flash_blocks = this.blocks.count(),
267 ?elapsed,
268 "Built new block with flashblocks"
269 );
270
271 return Poll::Ready(Some(Ok(Some(new_pending))));
272 }
273 Ok(None) => {
274 }
276 Err(err) => {
277 debug!(%err, "failed to execute flashblock");
279 }
280 }
281 }
282
283 while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) {
285 match result {
286 Ok(flashblock) => {
287 if flashblock.index == 0 {
288 this.metrics.last_flashblock_length.record(this.blocks.count() as f64);
289 }
290 match this.blocks.insert(flashblock) {
291 Ok(_) => this.rebuild = true,
292 Err(err) => debug!(%err, "Failed to prepare flashblock"),
293 }
294 }
295 Err(err) => return Poll::Ready(Some(Err(err))),
296 }
297 }
298
299 if let Poll::Ready(Ok(state)) = {
301 let fut = this.canon_receiver.recv();
302 pin!(fut);
303 fut.poll_unpin(cx)
304 } && let Some(current) = this.on_new_tip(state)
305 {
306 trace!(
307 parent_hash = %current.block().parent_hash(),
308 block_number = current.block().number(),
309 "Clearing current flashblock on new canonical block"
310 );
311
312 return Poll::Ready(Some(Ok(None)))
313 }
314
315 if !this.rebuild && this.current.is_some() {
316 return Poll::Pending
317 }
318
319 if let Some(args) = this.build_args() {
321 let now = Instant::now();
322
323 let fb_info = FlashBlockBuildInfo {
324 parent_hash: args.base.parent_hash,
325 index: args.last_flashblock_index,
326 block_number: args.base.block_number,
327 };
328 let _ = this.in_progress_tx.send(Some(fb_info));
330 let (tx, rx) = oneshot::channel();
331 let builder = this.builder.clone();
332
333 this.spawner.spawn_blocking(async move {
334 let _ = tx.send(builder.execute(args));
335 });
336 this.job.replace((now, rx));
337
338 continue
340 }
341
342 return Poll::Pending
343 }
344 }
345}
346
347type BuildJob<N> =
348 (Instant, oneshot::Receiver<eyre::Result<Option<(PendingFlashBlock<N>, CachedReads)>>>);
349
350#[derive(Metrics)]
351#[metrics(scope = "flashblock_service")]
352struct FlashBlockServiceMetrics {
353 last_flashblock_length: Histogram,
355 execution_duration: Histogram,
357}