reth_optimism_flashblocks/
sequence.rs1use crate::{ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx};
2use alloy_eips::eip2718::WithEncoded;
3use alloy_primitives::B256;
4use core::mem;
5use eyre::{bail, OptionExt};
6use reth_primitives_traits::{Recovered, SignedTransaction};
7use std::{collections::BTreeMap, ops::Deref};
8use tokio::sync::broadcast;
9use tracing::{debug, trace, warn};
10
11const FLASHBLOCK_SEQUENCE_CHANNEL_SIZE: usize = 128;
13
14#[derive(Debug)]
16pub(crate) struct FlashBlockPendingSequence<T> {
17 inner: BTreeMap<u64, PreparedFlashBlock<T>>,
22 block_broadcaster: broadcast::Sender<FlashBlockCompleteSequence>,
24 state_root: Option<B256>,
26}
27
28impl<T> FlashBlockPendingSequence<T>
29where
30 T: SignedTransaction,
31{
32 pub(crate) fn new() -> Self {
33 let (tx, _) = broadcast::channel(FLASHBLOCK_SEQUENCE_CHANNEL_SIZE);
36 Self { inner: BTreeMap::new(), block_broadcaster: tx, state_root: None }
37 }
38
39 pub(crate) fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx {
41 self.block_broadcaster.subscribe()
42 }
43
44 fn clear_and_broadcast_blocks(&mut self) {
46 let flashblocks = mem::take(&mut self.inner);
47
48 if self.block_broadcaster.receiver_count() > 0 {
50 let flashblocks = match FlashBlockCompleteSequence::new(
51 flashblocks.into_iter().map(|block| block.1.into()).collect(),
52 self.state_root,
53 ) {
54 Ok(flashblocks) => flashblocks,
55 Err(err) => {
56 debug!(target: "flashblocks", error = ?err, "Failed to create full flashblock complete sequence");
57 return;
58 }
59 };
60
61 if let Err(err) = self.block_broadcaster.send(flashblocks) {
65 warn!(target: "flashblocks", error = ?err, "Failed to send flashblocks to subscribers");
66 }
67 }
68 }
69
70 pub(crate) fn insert(&mut self, flashblock: FlashBlock) -> eyre::Result<()> {
74 if flashblock.index == 0 {
75 trace!(number=%flashblock.block_number(), "Tracking new flashblock sequence");
76
77 self.clear_and_broadcast_blocks();
79
80 self.inner.insert(flashblock.index, PreparedFlashBlock::new(flashblock)?);
81 return Ok(())
82 }
83
84 if self.block_number() == Some(flashblock.metadata.block_number) {
86 trace!(number=%flashblock.block_number(), index = %flashblock.index, block_count = self.inner.len() ,"Received followup flashblock");
87 self.inner.insert(flashblock.index, PreparedFlashBlock::new(flashblock)?);
88 } else {
89 trace!(number=%flashblock.block_number(), index = %flashblock.index, current=?self.block_number() ,"Ignoring untracked flashblock following");
90 }
91
92 Ok(())
93 }
94
95 pub(crate) const fn set_state_root(&mut self, state_root: Option<B256>) {
97 self.state_root = state_root;
98 }
99
100 pub(crate) fn ready_transactions(
107 &self,
108 ) -> impl Iterator<Item = WithEncoded<Recovered<T>>> + '_ {
109 self.inner
110 .values()
111 .enumerate()
112 .take_while(|(idx, block)| {
113 block.block().index == *idx as u64
115 })
116 .flat_map(|(_, block)| block.txs.clone())
117 }
118
119 pub(crate) fn block_number(&self) -> Option<u64> {
121 Some(self.inner.values().next()?.block().metadata.block_number)
122 }
123
124 pub(crate) fn payload_base(&self) -> Option<ExecutionPayloadBaseV1> {
126 self.inner.values().next()?.block().base.clone()
127 }
128
129 pub(crate) fn count(&self) -> usize {
131 self.inner.len()
132 }
133
134 pub(crate) fn last_flashblock(&self) -> Option<&FlashBlock> {
136 self.inner.last_key_value().map(|(_, b)| &b.block)
137 }
138
139 pub(crate) fn index(&self) -> Option<u64> {
141 Some(self.inner.values().last()?.block().index)
142 }
143}
144
145#[derive(Debug, Clone)]
148pub struct FlashBlockCompleteSequence {
149 inner: Vec<FlashBlock>,
150 state_root: Option<B256>,
152}
153
154impl FlashBlockCompleteSequence {
155 pub fn new(blocks: Vec<FlashBlock>, state_root: Option<B256>) -> eyre::Result<Self> {
161 let first_block = blocks.first().ok_or_eyre("No flashblocks in sequence")?;
162
163 first_block.base.as_ref().ok_or_eyre("Flashblock at index 0 has no base")?;
165
166 if !blocks.iter().enumerate().all(|(idx, block)| {
168 idx == block.index as usize &&
169 block.payload_id == first_block.payload_id &&
170 block.metadata.block_number == first_block.metadata.block_number
171 }) {
172 bail!("Flashblock inconsistencies detected in sequence");
173 }
174
175 Ok(Self { inner: blocks, state_root })
176 }
177
178 pub fn block_number(&self) -> u64 {
180 self.inner.first().unwrap().metadata.block_number
181 }
182
183 pub fn payload_base(&self) -> &ExecutionPayloadBaseV1 {
185 self.inner.first().unwrap().base.as_ref().unwrap()
186 }
187
188 pub const fn count(&self) -> usize {
190 self.inner.len()
191 }
192
193 pub fn last(&self) -> &FlashBlock {
195 self.inner.last().unwrap()
196 }
197
198 pub const fn state_root(&self) -> Option<B256> {
200 self.state_root
201 }
202}
203
204impl Deref for FlashBlockCompleteSequence {
205 type Target = Vec<FlashBlock>;
206
207 fn deref(&self) -> &Self::Target {
208 &self.inner
209 }
210}
211
212impl<T> TryFrom<FlashBlockPendingSequence<T>> for FlashBlockCompleteSequence {
213 type Error = eyre::Error;
214 fn try_from(sequence: FlashBlockPendingSequence<T>) -> Result<Self, Self::Error> {
215 Self::new(
216 sequence.inner.into_values().map(|block| block.block().clone()).collect::<Vec<_>>(),
217 sequence.state_root,
218 )
219 }
220}
221
222#[derive(Debug)]
223struct PreparedFlashBlock<T> {
224 txs: Vec<WithEncoded<Recovered<T>>>,
226 block: FlashBlock,
228}
229
230impl<T> PreparedFlashBlock<T> {
231 const fn block(&self) -> &FlashBlock {
232 &self.block
233 }
234}
235
236impl<T> From<PreparedFlashBlock<T>> for FlashBlock {
237 fn from(val: PreparedFlashBlock<T>) -> Self {
238 val.block
239 }
240}
241
242impl<T> PreparedFlashBlock<T>
243where
244 T: SignedTransaction,
245{
246 fn new(block: FlashBlock) -> eyre::Result<Self> {
250 let mut txs = Vec::with_capacity(block.diff.transactions.len());
251 for encoded in block.diff.transactions.iter().cloned() {
252 let tx = T::decode_2718_exact(encoded.as_ref())?;
253 let signer = tx.try_recover()?;
254 let tx = WithEncoded::new(encoded, tx.with_signer(signer));
255 txs.push(tx);
256 }
257
258 Ok(Self { txs, block })
259 }
260}
261
262impl<T> Deref for PreparedFlashBlock<T> {
263 type Target = FlashBlock;
264
265 fn deref(&self) -> &Self::Target {
266 &self.block
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use crate::ExecutionPayloadFlashblockDeltaV1;
274 use alloy_consensus::{
275 transaction::SignerRecoverable, EthereumTxEnvelope, EthereumTypedTransaction, TxEip1559,
276 };
277 use alloy_eips::Encodable2718;
278 use alloy_primitives::{hex, Signature, TxKind, U256};
279
280 #[test]
281 fn test_sequence_stops_before_gap() {
282 let mut sequence = FlashBlockPendingSequence::new();
283 let tx = EthereumTxEnvelope::new_unhashed(
284 EthereumTypedTransaction::<TxEip1559>::Eip1559(TxEip1559 {
285 chain_id: 4,
286 nonce: 26u64,
287 max_priority_fee_per_gas: 1500000000,
288 max_fee_per_gas: 1500000013,
289 gas_limit: 21_000u64,
290 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
291 value: U256::from(3000000000000000000u64),
292 input: Default::default(),
293 access_list: Default::default(),
294 }),
295 Signature::new(
296 U256::from_be_bytes(hex!(
297 "59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd"
298 )),
299 U256::from_be_bytes(hex!(
300 "016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469"
301 )),
302 true,
303 ),
304 );
305 let tx = Recovered::new_unchecked(tx.clone(), tx.recover_signer_unchecked().unwrap());
306
307 sequence
308 .insert(FlashBlock {
309 payload_id: Default::default(),
310 index: 0,
311 base: None,
312 diff: ExecutionPayloadFlashblockDeltaV1 {
313 transactions: vec![tx.encoded_2718().into()],
314 ..Default::default()
315 },
316 metadata: Default::default(),
317 })
318 .unwrap();
319
320 sequence
321 .insert(FlashBlock {
322 payload_id: Default::default(),
323 index: 2,
324 base: None,
325 diff: Default::default(),
326 metadata: Default::default(),
327 })
328 .unwrap();
329
330 let actual_txs: Vec<_> = sequence.ready_transactions().collect();
331 let expected_txs = vec![WithEncoded::new(tx.encoded_2718().into(), tx)];
332
333 assert_eq!(actual_txs, expected_txs);
334 }
335
336 #[test]
337 fn test_sequence_sends_flashblocks_to_subscribers() {
338 let mut sequence = FlashBlockPendingSequence::<EthereumTxEnvelope<TxEip1559>>::new();
339 let mut subscriber = sequence.subscribe_block_sequence();
340
341 for idx in 0..10 {
342 sequence
343 .insert(FlashBlock {
344 payload_id: Default::default(),
345 index: idx,
346 base: Some(ExecutionPayloadBaseV1::default()),
347 diff: Default::default(),
348 metadata: Default::default(),
349 })
350 .unwrap();
351 }
352
353 assert_eq!(sequence.count(), 10);
354
355 let no_flashblock = subscriber.try_recv();
357 assert!(no_flashblock.is_err());
358
359 sequence
361 .insert(FlashBlock {
362 payload_id: Default::default(),
363 index: 0,
364 base: Some(ExecutionPayloadBaseV1::default()),
365 diff: Default::default(),
366 metadata: Default::default(),
367 })
368 .unwrap();
369
370 let flashblocks = subscriber.try_recv().unwrap();
371 assert_eq!(flashblocks.count(), 10);
372
373 for (idx, block) in flashblocks.iter().enumerate() {
374 assert_eq!(block.index, idx as u64);
375 }
376 }
377}