reth_optimism_flashblocks/
sequence.rs

1use crate::{ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx};
2use alloy_eips::eip2718::WithEncoded;
3use alloy_primitives::B256;
4use core::mem;
5use eyre::{bail, OptionExt};
6use reth_primitives_traits::{Recovered, SignedTransaction};
7use std::{collections::BTreeMap, ops::Deref};
8use tokio::sync::broadcast;
9use tracing::{debug, trace, warn};
10
11/// The size of the broadcast channel for completed flashblock sequences.
12const FLASHBLOCK_SEQUENCE_CHANNEL_SIZE: usize = 128;
13
14/// An ordered B-tree keeping the track of a sequence of [`FlashBlock`]s by their indices.
15#[derive(Debug)]
16pub(crate) struct FlashBlockPendingSequence<T> {
17    /// tracks the individual flashblocks in order
18    ///
19    /// With a blocktime of 2s and flashblock tick-rate of 200ms plus one extra flashblock per new
20    /// pending block, we expect 11 flashblocks per slot.
21    inner: BTreeMap<u64, PreparedFlashBlock<T>>,
22    /// Broadcasts flashblocks to subscribers.
23    block_broadcaster: broadcast::Sender<FlashBlockCompleteSequence>,
24    /// Optional properly computed state root for the current sequence.
25    state_root: Option<B256>,
26}
27
28impl<T> FlashBlockPendingSequence<T>
29where
30    T: SignedTransaction,
31{
32    pub(crate) fn new() -> Self {
33        // Note: if the channel is full, send will not block but rather overwrite the oldest
34        // messages. Order is preserved.
35        let (tx, _) = broadcast::channel(FLASHBLOCK_SEQUENCE_CHANNEL_SIZE);
36        Self { inner: BTreeMap::new(), block_broadcaster: tx, state_root: None }
37    }
38
39    /// Gets a subscriber to the flashblock sequences produced.
40    pub(crate) fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx {
41        self.block_broadcaster.subscribe()
42    }
43
44    // Clears the state and broadcasts the blocks produced to subscribers.
45    fn clear_and_broadcast_blocks(&mut self) {
46        let flashblocks = mem::take(&mut self.inner);
47
48        // If there are any subscribers, send the flashblocks to them.
49        if self.block_broadcaster.receiver_count() > 0 {
50            let flashblocks = match FlashBlockCompleteSequence::new(
51                flashblocks.into_iter().map(|block| block.1.into()).collect(),
52                self.state_root,
53            ) {
54                Ok(flashblocks) => flashblocks,
55                Err(err) => {
56                    debug!(target: "flashblocks", error = ?err, "Failed to create full flashblock complete sequence");
57                    return;
58                }
59            };
60
61            // Note: this should only ever fail if there are no receivers. This can happen if
62            // there is a race condition between the clause right above and this
63            // one. We can simply warn the user and continue.
64            if let Err(err) = self.block_broadcaster.send(flashblocks) {
65                warn!(target: "flashblocks", error = ?err, "Failed to send flashblocks to subscribers");
66            }
67        }
68    }
69
70    /// Inserts a new block into the sequence.
71    ///
72    /// A [`FlashBlock`] with index 0 resets the set.
73    pub(crate) fn insert(&mut self, flashblock: FlashBlock) -> eyre::Result<()> {
74        if flashblock.index == 0 {
75            trace!(number=%flashblock.block_number(), "Tracking new flashblock sequence");
76
77            // Flash block at index zero resets the whole state.
78            self.clear_and_broadcast_blocks();
79
80            self.inner.insert(flashblock.index, PreparedFlashBlock::new(flashblock)?);
81            return Ok(())
82        }
83
84        // only insert if we previously received the same block, assume we received index 0
85        if self.block_number() == Some(flashblock.metadata.block_number) {
86            trace!(number=%flashblock.block_number(), index = %flashblock.index, block_count = self.inner.len()  ,"Received followup flashblock");
87            self.inner.insert(flashblock.index, PreparedFlashBlock::new(flashblock)?);
88        } else {
89            trace!(number=%flashblock.block_number(), index = %flashblock.index, current=?self.block_number()  ,"Ignoring untracked flashblock following");
90        }
91
92        Ok(())
93    }
94
95    /// Set state root
96    pub(crate) const fn set_state_root(&mut self, state_root: Option<B256>) {
97        self.state_root = state_root;
98    }
99
100    /// Iterator over sequence of executable transactions.
101    ///
102    /// A flashblocks is not ready if there's missing previous flashblocks, i.e. there's a gap in
103    /// the sequence
104    ///
105    /// Note: flashblocks start at `index 0`.
106    pub(crate) fn ready_transactions(
107        &self,
108    ) -> impl Iterator<Item = WithEncoded<Recovered<T>>> + '_ {
109        self.inner
110            .values()
111            .enumerate()
112            .take_while(|(idx, block)| {
113                // flashblock index 0 is the first flashblock
114                block.block().index == *idx as u64
115            })
116            .flat_map(|(_, block)| block.txs.clone())
117    }
118
119    /// Returns the first block number
120    pub(crate) fn block_number(&self) -> Option<u64> {
121        Some(self.inner.values().next()?.block().metadata.block_number)
122    }
123
124    /// Returns the payload base of the first tracked flashblock.
125    pub(crate) fn payload_base(&self) -> Option<ExecutionPayloadBaseV1> {
126        self.inner.values().next()?.block().base.clone()
127    }
128
129    /// Returns the number of tracked flashblocks.
130    pub(crate) fn count(&self) -> usize {
131        self.inner.len()
132    }
133
134    /// Returns the reference to the last flashblock.
135    pub(crate) fn last_flashblock(&self) -> Option<&FlashBlock> {
136        self.inner.last_key_value().map(|(_, b)| &b.block)
137    }
138
139    /// Returns the current/latest flashblock index in the sequence
140    pub(crate) fn index(&self) -> Option<u64> {
141        Some(self.inner.values().last()?.block().index)
142    }
143}
144
145/// A complete sequence of flashblocks, often corresponding to a full block.
146/// Ensure invariants of a complete flashblocks sequence.
147#[derive(Debug, Clone)]
148pub struct FlashBlockCompleteSequence {
149    inner: Vec<FlashBlock>,
150    /// Optional state root for the current sequence
151    state_root: Option<B256>,
152}
153
154impl FlashBlockCompleteSequence {
155    /// Create a complete sequence from a vector of flashblocks.
156    /// Ensure that:
157    /// * vector is not empty
158    /// * first flashblock have the base payload
159    /// * sequence of flashblocks is sound (successive index from 0, same payload id, ...)
160    pub fn new(blocks: Vec<FlashBlock>, state_root: Option<B256>) -> eyre::Result<Self> {
161        let first_block = blocks.first().ok_or_eyre("No flashblocks in sequence")?;
162
163        // Ensure that first flashblock have base
164        first_block.base.as_ref().ok_or_eyre("Flashblock at index 0 has no base")?;
165
166        // Ensure that index are successive from 0, have same block number and payload id
167        if !blocks.iter().enumerate().all(|(idx, block)| {
168            idx == block.index as usize &&
169                block.payload_id == first_block.payload_id &&
170                block.metadata.block_number == first_block.metadata.block_number
171        }) {
172            bail!("Flashblock inconsistencies detected in sequence");
173        }
174
175        Ok(Self { inner: blocks, state_root })
176    }
177
178    /// Returns the block number
179    pub fn block_number(&self) -> u64 {
180        self.inner.first().unwrap().metadata.block_number
181    }
182
183    /// Returns the payload base of the first flashblock.
184    pub fn payload_base(&self) -> &ExecutionPayloadBaseV1 {
185        self.inner.first().unwrap().base.as_ref().unwrap()
186    }
187
188    /// Returns the number of flashblocks in the sequence.
189    pub const fn count(&self) -> usize {
190        self.inner.len()
191    }
192
193    /// Returns the last flashblock in the sequence.
194    pub fn last(&self) -> &FlashBlock {
195        self.inner.last().unwrap()
196    }
197
198    /// Returns the state root for the current sequence
199    pub const fn state_root(&self) -> Option<B256> {
200        self.state_root
201    }
202}
203
204impl Deref for FlashBlockCompleteSequence {
205    type Target = Vec<FlashBlock>;
206
207    fn deref(&self) -> &Self::Target {
208        &self.inner
209    }
210}
211
212impl<T> TryFrom<FlashBlockPendingSequence<T>> for FlashBlockCompleteSequence {
213    type Error = eyre::Error;
214    fn try_from(sequence: FlashBlockPendingSequence<T>) -> Result<Self, Self::Error> {
215        Self::new(
216            sequence.inner.into_values().map(|block| block.block().clone()).collect::<Vec<_>>(),
217            sequence.state_root,
218        )
219    }
220}
221
222#[derive(Debug)]
223struct PreparedFlashBlock<T> {
224    /// The prepared transactions, ready for execution
225    txs: Vec<WithEncoded<Recovered<T>>>,
226    /// The tracked flashblock
227    block: FlashBlock,
228}
229
230impl<T> PreparedFlashBlock<T> {
231    const fn block(&self) -> &FlashBlock {
232        &self.block
233    }
234}
235
236impl<T> From<PreparedFlashBlock<T>> for FlashBlock {
237    fn from(val: PreparedFlashBlock<T>) -> Self {
238        val.block
239    }
240}
241
242impl<T> PreparedFlashBlock<T>
243where
244    T: SignedTransaction,
245{
246    /// Creates a flashblock that is ready for execution by preparing all transactions
247    ///
248    /// Returns an error if decoding or signer recovery fails.
249    fn new(block: FlashBlock) -> eyre::Result<Self> {
250        let mut txs = Vec::with_capacity(block.diff.transactions.len());
251        for encoded in block.diff.transactions.iter().cloned() {
252            let tx = T::decode_2718_exact(encoded.as_ref())?;
253            let signer = tx.try_recover()?;
254            let tx = WithEncoded::new(encoded, tx.with_signer(signer));
255            txs.push(tx);
256        }
257
258        Ok(Self { txs, block })
259    }
260}
261
262impl<T> Deref for PreparedFlashBlock<T> {
263    type Target = FlashBlock;
264
265    fn deref(&self) -> &Self::Target {
266        &self.block
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use crate::ExecutionPayloadFlashblockDeltaV1;
274    use alloy_consensus::{
275        transaction::SignerRecoverable, EthereumTxEnvelope, EthereumTypedTransaction, TxEip1559,
276    };
277    use alloy_eips::Encodable2718;
278    use alloy_primitives::{hex, Signature, TxKind, U256};
279
280    #[test]
281    fn test_sequence_stops_before_gap() {
282        let mut sequence = FlashBlockPendingSequence::new();
283        let tx = EthereumTxEnvelope::new_unhashed(
284            EthereumTypedTransaction::<TxEip1559>::Eip1559(TxEip1559 {
285                chain_id: 4,
286                nonce: 26u64,
287                max_priority_fee_per_gas: 1500000000,
288                max_fee_per_gas: 1500000013,
289                gas_limit: 21_000u64,
290                to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
291                value: U256::from(3000000000000000000u64),
292                input: Default::default(),
293                access_list: Default::default(),
294            }),
295            Signature::new(
296                U256::from_be_bytes(hex!(
297                    "59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd"
298                )),
299                U256::from_be_bytes(hex!(
300                    "016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469"
301                )),
302                true,
303            ),
304        );
305        let tx = Recovered::new_unchecked(tx.clone(), tx.recover_signer_unchecked().unwrap());
306
307        sequence
308            .insert(FlashBlock {
309                payload_id: Default::default(),
310                index: 0,
311                base: None,
312                diff: ExecutionPayloadFlashblockDeltaV1 {
313                    transactions: vec![tx.encoded_2718().into()],
314                    ..Default::default()
315                },
316                metadata: Default::default(),
317            })
318            .unwrap();
319
320        sequence
321            .insert(FlashBlock {
322                payload_id: Default::default(),
323                index: 2,
324                base: None,
325                diff: Default::default(),
326                metadata: Default::default(),
327            })
328            .unwrap();
329
330        let actual_txs: Vec<_> = sequence.ready_transactions().collect();
331        let expected_txs = vec![WithEncoded::new(tx.encoded_2718().into(), tx)];
332
333        assert_eq!(actual_txs, expected_txs);
334    }
335
336    #[test]
337    fn test_sequence_sends_flashblocks_to_subscribers() {
338        let mut sequence = FlashBlockPendingSequence::<EthereumTxEnvelope<TxEip1559>>::new();
339        let mut subscriber = sequence.subscribe_block_sequence();
340
341        for idx in 0..10 {
342            sequence
343                .insert(FlashBlock {
344                    payload_id: Default::default(),
345                    index: idx,
346                    base: Some(ExecutionPayloadBaseV1::default()),
347                    diff: Default::default(),
348                    metadata: Default::default(),
349                })
350                .unwrap();
351        }
352
353        assert_eq!(sequence.count(), 10);
354
355        // Then we don't receive anything until we insert a new flashblock
356        let no_flashblock = subscriber.try_recv();
357        assert!(no_flashblock.is_err());
358
359        // Let's insert a new flashblock with index 0
360        sequence
361            .insert(FlashBlock {
362                payload_id: Default::default(),
363                index: 0,
364                base: Some(ExecutionPayloadBaseV1::default()),
365                diff: Default::default(),
366                metadata: Default::default(),
367            })
368            .unwrap();
369
370        let flashblocks = subscriber.try_recv().unwrap();
371        assert_eq!(flashblocks.count(), 10);
372
373        for (idx, block) in flashblocks.iter().enumerate() {
374            assert_eq!(block.index, idx as u64);
375        }
376    }
377}