reth_optimism_flashblocks/
service.rs1use crate::{
2 cache::SequenceManager, worker::FlashBlockBuilder, FlashBlock, FlashBlockCompleteSequence,
3 FlashBlockCompleteSequenceRx, InProgressFlashBlockRx, PendingFlashBlock,
4};
5use alloy_primitives::B256;
6use futures_util::{FutureExt, Stream, StreamExt};
7use metrics::{Gauge, Histogram};
8use op_alloy_rpc_types_engine::OpFlashblockPayloadBase;
9use reth_evm::ConfigureEvm;
10use reth_metrics::Metrics;
11use reth_primitives_traits::{AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy};
12use reth_revm::cached::CachedReads;
13use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
14use reth_tasks::TaskExecutor;
15use std::{sync::Arc, time::Instant};
16use tokio::sync::{oneshot, watch};
17use tracing::*;
18
19#[derive(Debug)]
22pub struct FlashBlockService<
23 N: NodePrimitives,
24 S,
25 EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<OpFlashblockPayloadBase> + Unpin>,
26 Provider,
27> {
28 incoming_flashblock_rx: S,
30 in_progress_tx: watch::Sender<Option<FlashBlockBuildInfo>>,
32 received_flashblocks_tx: tokio::sync::broadcast::Sender<Arc<FlashBlock>>,
34
35 builder: FlashBlockBuilder<EvmConfig, Provider>,
37 spawner: TaskExecutor,
39 job: Option<BuildJob<N>>,
41 sequences: SequenceManager<N::SignedTx>,
43
44 metrics: FlashBlockServiceMetrics,
46}
47
48impl<N, S, EvmConfig, Provider> FlashBlockService<N, S, EvmConfig, Provider>
49where
50 N: NodePrimitives,
51 S: Stream<Item = eyre::Result<FlashBlock>> + Unpin + 'static,
52 EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<OpFlashblockPayloadBase> + Unpin>
53 + Clone
54 + 'static,
55 Provider: StateProviderFactory
56 + BlockReaderIdExt<
57 Header = HeaderTy<N>,
58 Block = BlockTy<N>,
59 Transaction = N::SignedTx,
60 Receipt = ReceiptTy<N>,
61 > + Unpin
62 + Clone
63 + 'static,
64{
65 pub fn new(
67 incoming_flashblock_rx: S,
68 evm_config: EvmConfig,
69 provider: Provider,
70 spawner: TaskExecutor,
71 compute_state_root: bool,
72 ) -> Self {
73 let (in_progress_tx, _) = watch::channel(None);
74 let (received_flashblocks_tx, _) = tokio::sync::broadcast::channel(128);
75 Self {
76 incoming_flashblock_rx,
77 in_progress_tx,
78 received_flashblocks_tx,
79 builder: FlashBlockBuilder::new(evm_config, provider),
80 spawner,
81 job: None,
82 sequences: SequenceManager::new(compute_state_root),
83 metrics: FlashBlockServiceMetrics::default(),
84 }
85 }
86
87 pub const fn flashblocks_broadcaster(
89 &self,
90 ) -> &tokio::sync::broadcast::Sender<Arc<FlashBlock>> {
91 &self.received_flashblocks_tx
92 }
93
94 pub const fn block_sequence_broadcaster(
96 &self,
97 ) -> &tokio::sync::broadcast::Sender<FlashBlockCompleteSequence> {
98 self.sequences.block_sequence_broadcaster()
99 }
100
101 pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx {
103 self.sequences.subscribe_block_sequence()
104 }
105
106 pub fn subscribe_in_progress(&self) -> InProgressFlashBlockRx {
108 self.in_progress_tx.subscribe()
109 }
110
111 pub async fn run(mut self, tx: watch::Sender<Option<PendingFlashBlock<N>>>) {
120 loop {
121 tokio::select! {
122 Some(result) = async {
124 match self.job.as_mut() {
125 Some((_, rx)) => rx.await.ok(),
126 None => std::future::pending().await,
127 }
128 } => {
129 let (start_time, _) = self.job.take().unwrap();
130 let _ = self.in_progress_tx.send(None);
131
132 match result {
133 Ok(Some((pending, cached_reads))) => {
134 let parent_hash = pending.parent_hash();
135 self.sequences
136 .on_build_complete(parent_hash, Some((pending.clone(), cached_reads)));
137
138 let elapsed = start_time.elapsed();
139 self.metrics.execution_duration.record(elapsed.as_secs_f64());
140
141 let _ = tx.send(Some(pending));
142 }
143 Ok(None) => {
144 trace!(target: "flashblocks", "Build job returned None");
145 }
146 Err(err) => {
147 warn!(target: "flashblocks", %err, "Build job failed");
148 }
149 }
150 }
151
152 result = self.incoming_flashblock_rx.next() => {
154 match result {
155 Some(Ok(flashblock)) => {
156 self.process_flashblock(flashblock);
158
159 while let Some(result) = self.incoming_flashblock_rx.next().now_or_never().flatten() {
161 match result {
162 Ok(fb) => self.process_flashblock(fb),
163 Err(err) => warn!(target: "flashblocks", %err, "Error receiving flashblock"),
164 }
165 }
166
167 self.try_start_build_job();
168 }
169 Some(Err(err)) => {
170 warn!(target: "flashblocks", %err, "Error receiving flashblock");
171 }
172 None => {
173 warn!(target: "flashblocks", "Flashblock stream ended");
174 break;
175 }
176 }
177 }
178 }
179 }
180 }
181
182 fn process_flashblock(&mut self, flashblock: FlashBlock) {
185 self.notify_received_flashblock(&flashblock);
186
187 if flashblock.index == 0 {
188 self.metrics.last_flashblock_length.record(self.sequences.pending().count() as f64);
189 }
190
191 if let Err(err) = self.sequences.insert_flashblock(flashblock) {
192 trace!(target: "flashblocks", %err, "Failed to insert flashblock");
193 }
194 }
195
196 fn notify_received_flashblock(&self, flashblock: &FlashBlock) {
198 if self.received_flashblocks_tx.receiver_count() > 0 {
199 let _ = self.received_flashblocks_tx.send(Arc::new(flashblock.clone()));
200 }
201 }
202
203 fn try_start_build_job(&mut self) {
205 if self.job.is_some() {
206 return; }
208
209 let Some(latest) = self.builder.provider().latest_header().ok().flatten() else {
210 return;
211 };
212
213 let Some(args) = self.sequences.next_buildable_args(latest.hash(), latest.timestamp())
214 else {
215 return; };
217
218 let fb_info = FlashBlockBuildInfo {
220 parent_hash: args.base.parent_hash,
221 index: args.last_flashblock_index,
222 block_number: args.base.block_number,
223 };
224 self.metrics.current_block_height.set(fb_info.block_number as f64);
225 self.metrics.current_index.set(fb_info.index as f64);
226 let _ = self.in_progress_tx.send(Some(fb_info));
227
228 let (tx, rx) = oneshot::channel();
229 let builder = self.builder.clone();
230 self.spawner.spawn_blocking(Box::pin(async move {
231 let _ = tx.send(builder.execute(args));
232 }));
233 self.job = Some((Instant::now(), rx));
234 }
235}
236
237#[derive(Debug, Clone, Copy)]
239pub struct FlashBlockBuildInfo {
240 pub parent_hash: B256,
242 pub index: u64,
244 pub block_number: u64,
246}
247
248type BuildJob<N> =
249 (Instant, oneshot::Receiver<eyre::Result<Option<(PendingFlashBlock<N>, CachedReads)>>>);
250
251#[derive(Metrics)]
252#[metrics(scope = "flashblock_service")]
253struct FlashBlockServiceMetrics {
254 last_flashblock_length: Histogram,
256 execution_duration: Histogram,
258 current_block_height: Gauge,
260 current_index: Gauge,
262}