reth_optimism_flashblocks/
sequence.rs

1use crate::{ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx};
2use alloy_eips::eip2718::WithEncoded;
3use alloy_primitives::B256;
4use alloy_rpc_types_engine::PayloadId;
5use core::mem;
6use eyre::{bail, OptionExt};
7use reth_primitives_traits::{Recovered, SignedTransaction};
8use std::{collections::BTreeMap, ops::Deref};
9use tokio::sync::broadcast;
10use tracing::{debug, trace, warn};
11
12/// The size of the broadcast channel for completed flashblock sequences.
13const FLASHBLOCK_SEQUENCE_CHANNEL_SIZE: usize = 128;
14
15/// An ordered B-tree keeping the track of a sequence of [`FlashBlock`]s by their indices.
16#[derive(Debug)]
17pub struct FlashBlockPendingSequence<T> {
18    /// tracks the individual flashblocks in order
19    ///
20    /// With a blocktime of 2s and flashblock tick-rate of 200ms plus one extra flashblock per new
21    /// pending block, we expect 11 flashblocks per slot.
22    inner: BTreeMap<u64, PreparedFlashBlock<T>>,
23    /// Broadcasts flashblocks to subscribers.
24    block_broadcaster: broadcast::Sender<FlashBlockCompleteSequence>,
25    /// Optional properly computed state root for the current sequence.
26    state_root: Option<B256>,
27}
28
29impl<T> FlashBlockPendingSequence<T>
30where
31    T: SignedTransaction,
32{
33    /// Create a new pending sequence.
34    pub fn new() -> Self {
35        // Note: if the channel is full, send will not block but rather overwrite the oldest
36        // messages. Order is preserved.
37        let (tx, _) = broadcast::channel(FLASHBLOCK_SEQUENCE_CHANNEL_SIZE);
38        Self { inner: BTreeMap::new(), block_broadcaster: tx, state_root: None }
39    }
40
41    /// Gets a subscriber to the flashblock sequences produced.
42    pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx {
43        self.block_broadcaster.subscribe()
44    }
45
46    // Clears the state and broadcasts the blocks produced to subscribers.
47    fn clear_and_broadcast_blocks(&mut self) {
48        let flashblocks = mem::take(&mut self.inner);
49
50        // If there are any subscribers, send the flashblocks to them.
51        if self.block_broadcaster.receiver_count() > 0 {
52            let flashblocks = match FlashBlockCompleteSequence::new(
53                flashblocks.into_iter().map(|block| block.1.into()).collect(),
54                self.state_root,
55            ) {
56                Ok(flashblocks) => flashblocks,
57                Err(err) => {
58                    debug!(target: "flashblocks", error = ?err, "Failed to create full flashblock complete sequence");
59                    return;
60                }
61            };
62
63            // Note: this should only ever fail if there are no receivers. This can happen if
64            // there is a race condition between the clause right above and this
65            // one. We can simply warn the user and continue.
66            if let Err(err) = self.block_broadcaster.send(flashblocks) {
67                warn!(target: "flashblocks", error = ?err, "Failed to send flashblocks to subscribers");
68            }
69        }
70    }
71
72    /// Inserts a new block into the sequence.
73    ///
74    /// A [`FlashBlock`] with index 0 resets the set.
75    pub fn insert(&mut self, flashblock: FlashBlock) -> eyre::Result<()> {
76        if flashblock.index == 0 {
77            trace!(number=%flashblock.block_number(), "Tracking new flashblock sequence");
78
79            // Flash block at index zero resets the whole state.
80            self.clear_and_broadcast_blocks();
81
82            self.inner.insert(flashblock.index, PreparedFlashBlock::new(flashblock)?);
83            return Ok(())
84        }
85
86        // only insert if we previously received the same block and payload, assume we received
87        // index 0
88        let same_block = self.block_number() == Some(flashblock.metadata.block_number);
89        let same_payload = self.payload_id() == Some(flashblock.payload_id);
90
91        if same_block && same_payload {
92            trace!(number=%flashblock.block_number(), index = %flashblock.index, block_count = self.inner.len()  ,"Received followup flashblock");
93            self.inner.insert(flashblock.index, PreparedFlashBlock::new(flashblock)?);
94        } else {
95            trace!(number=%flashblock.block_number(), index = %flashblock.index, current=?self.block_number()  ,"Ignoring untracked flashblock following");
96        }
97
98        Ok(())
99    }
100
101    /// Set state root
102    pub const fn set_state_root(&mut self, state_root: Option<B256>) {
103        self.state_root = state_root;
104    }
105
106    /// Iterator over sequence of executable transactions.
107    ///
108    /// A flashblocks is not ready if there's missing previous flashblocks, i.e. there's a gap in
109    /// the sequence
110    ///
111    /// Note: flashblocks start at `index 0`.
112    pub fn ready_transactions(&self) -> impl Iterator<Item = WithEncoded<Recovered<T>>> + '_ {
113        self.inner
114            .values()
115            .enumerate()
116            .take_while(|(idx, block)| {
117                // flashblock index 0 is the first flashblock
118                block.block().index == *idx as u64
119            })
120            .flat_map(|(_, block)| block.txs.clone())
121    }
122
123    /// Returns the first block number
124    pub fn block_number(&self) -> Option<u64> {
125        Some(self.inner.values().next()?.block().metadata.block_number)
126    }
127
128    /// Returns the payload base of the first tracked flashblock.
129    pub fn payload_base(&self) -> Option<ExecutionPayloadBaseV1> {
130        self.inner.values().next()?.block().base.clone()
131    }
132
133    /// Returns the number of tracked flashblocks.
134    pub fn count(&self) -> usize {
135        self.inner.len()
136    }
137
138    /// Returns the reference to the last flashblock.
139    pub fn last_flashblock(&self) -> Option<&FlashBlock> {
140        self.inner.last_key_value().map(|(_, b)| &b.block)
141    }
142
143    /// Returns the current/latest flashblock index in the sequence
144    pub fn index(&self) -> Option<u64> {
145        Some(self.inner.values().last()?.block().index)
146    }
147    /// Returns the payload id of the first tracked flashblock in the current sequence.
148    pub fn payload_id(&self) -> Option<PayloadId> {
149        Some(self.inner.values().next()?.block().payload_id)
150    }
151}
152
153impl<T> Default for FlashBlockPendingSequence<T>
154where
155    T: SignedTransaction,
156{
157    fn default() -> Self {
158        Self::new()
159    }
160}
161
162/// A complete sequence of flashblocks, often corresponding to a full block.
163/// Ensure invariants of a complete flashblocks sequence.
164#[derive(Debug, Clone)]
165pub struct FlashBlockCompleteSequence {
166    inner: Vec<FlashBlock>,
167    /// Optional state root for the current sequence
168    state_root: Option<B256>,
169}
170
171impl FlashBlockCompleteSequence {
172    /// Create a complete sequence from a vector of flashblocks.
173    /// Ensure that:
174    /// * vector is not empty
175    /// * first flashblock have the base payload
176    /// * sequence of flashblocks is sound (successive index from 0, same payload id, ...)
177    pub fn new(blocks: Vec<FlashBlock>, state_root: Option<B256>) -> eyre::Result<Self> {
178        let first_block = blocks.first().ok_or_eyre("No flashblocks in sequence")?;
179
180        // Ensure that first flashblock have base
181        first_block.base.as_ref().ok_or_eyre("Flashblock at index 0 has no base")?;
182
183        // Ensure that index are successive from 0, have same block number and payload id
184        if !blocks.iter().enumerate().all(|(idx, block)| {
185            idx == block.index as usize &&
186                block.payload_id == first_block.payload_id &&
187                block.metadata.block_number == first_block.metadata.block_number
188        }) {
189            bail!("Flashblock inconsistencies detected in sequence");
190        }
191
192        Ok(Self { inner: blocks, state_root })
193    }
194
195    /// Returns the block number
196    pub fn block_number(&self) -> u64 {
197        self.inner.first().unwrap().metadata.block_number
198    }
199
200    /// Returns the payload base of the first flashblock.
201    pub fn payload_base(&self) -> &ExecutionPayloadBaseV1 {
202        self.inner.first().unwrap().base.as_ref().unwrap()
203    }
204
205    /// Returns the number of flashblocks in the sequence.
206    pub const fn count(&self) -> usize {
207        self.inner.len()
208    }
209
210    /// Returns the last flashblock in the sequence.
211    pub fn last(&self) -> &FlashBlock {
212        self.inner.last().unwrap()
213    }
214
215    /// Returns the state root for the current sequence
216    pub const fn state_root(&self) -> Option<B256> {
217        self.state_root
218    }
219}
220
221impl Deref for FlashBlockCompleteSequence {
222    type Target = Vec<FlashBlock>;
223
224    fn deref(&self) -> &Self::Target {
225        &self.inner
226    }
227}
228
229impl<T> TryFrom<FlashBlockPendingSequence<T>> for FlashBlockCompleteSequence {
230    type Error = eyre::Error;
231    fn try_from(sequence: FlashBlockPendingSequence<T>) -> Result<Self, Self::Error> {
232        Self::new(
233            sequence.inner.into_values().map(|block| block.block().clone()).collect::<Vec<_>>(),
234            sequence.state_root,
235        )
236    }
237}
238
239#[derive(Debug)]
240struct PreparedFlashBlock<T> {
241    /// The prepared transactions, ready for execution
242    txs: Vec<WithEncoded<Recovered<T>>>,
243    /// The tracked flashblock
244    block: FlashBlock,
245}
246
247impl<T> PreparedFlashBlock<T> {
248    const fn block(&self) -> &FlashBlock {
249        &self.block
250    }
251}
252
253impl<T> From<PreparedFlashBlock<T>> for FlashBlock {
254    fn from(val: PreparedFlashBlock<T>) -> Self {
255        val.block
256    }
257}
258
259impl<T> PreparedFlashBlock<T>
260where
261    T: SignedTransaction,
262{
263    /// Creates a flashblock that is ready for execution by preparing all transactions
264    ///
265    /// Returns an error if decoding or signer recovery fails.
266    fn new(block: FlashBlock) -> eyre::Result<Self> {
267        let mut txs = Vec::with_capacity(block.diff.transactions.len());
268        for encoded in block.diff.transactions.iter().cloned() {
269            let tx = T::decode_2718_exact(encoded.as_ref())?;
270            let signer = tx.try_recover()?;
271            let tx = WithEncoded::new(encoded, tx.with_signer(signer));
272            txs.push(tx);
273        }
274
275        Ok(Self { txs, block })
276    }
277}
278
279impl<T> Deref for PreparedFlashBlock<T> {
280    type Target = FlashBlock;
281
282    fn deref(&self) -> &Self::Target {
283        &self.block
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use crate::ExecutionPayloadFlashblockDeltaV1;
291    use alloy_consensus::{
292        transaction::SignerRecoverable, EthereumTxEnvelope, EthereumTypedTransaction, TxEip1559,
293    };
294    use alloy_eips::Encodable2718;
295    use alloy_primitives::{hex, Signature, TxKind, U256};
296
297    #[test]
298    fn test_sequence_stops_before_gap() {
299        let mut sequence = FlashBlockPendingSequence::new();
300        let tx = EthereumTxEnvelope::new_unhashed(
301            EthereumTypedTransaction::<TxEip1559>::Eip1559(TxEip1559 {
302                chain_id: 4,
303                nonce: 26u64,
304                max_priority_fee_per_gas: 1500000000,
305                max_fee_per_gas: 1500000013,
306                gas_limit: 21_000u64,
307                to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
308                value: U256::from(3000000000000000000u64),
309                input: Default::default(),
310                access_list: Default::default(),
311            }),
312            Signature::new(
313                U256::from_be_bytes(hex!(
314                    "59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd"
315                )),
316                U256::from_be_bytes(hex!(
317                    "016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469"
318                )),
319                true,
320            ),
321        );
322        let tx = Recovered::new_unchecked(tx.clone(), tx.recover_signer_unchecked().unwrap());
323
324        sequence
325            .insert(FlashBlock {
326                payload_id: Default::default(),
327                index: 0,
328                base: None,
329                diff: ExecutionPayloadFlashblockDeltaV1 {
330                    transactions: vec![tx.encoded_2718().into()],
331                    ..Default::default()
332                },
333                metadata: Default::default(),
334            })
335            .unwrap();
336
337        sequence
338            .insert(FlashBlock {
339                payload_id: Default::default(),
340                index: 2,
341                base: None,
342                diff: Default::default(),
343                metadata: Default::default(),
344            })
345            .unwrap();
346
347        let actual_txs: Vec<_> = sequence.ready_transactions().collect();
348        let expected_txs = vec![WithEncoded::new(tx.encoded_2718().into(), tx)];
349
350        assert_eq!(actual_txs, expected_txs);
351    }
352
353    #[test]
354    fn test_sequence_sends_flashblocks_to_subscribers() {
355        let mut sequence = FlashBlockPendingSequence::<EthereumTxEnvelope<TxEip1559>>::new();
356        let mut subscriber = sequence.subscribe_block_sequence();
357
358        for idx in 0..10 {
359            sequence
360                .insert(FlashBlock {
361                    payload_id: Default::default(),
362                    index: idx,
363                    base: Some(ExecutionPayloadBaseV1::default()),
364                    diff: Default::default(),
365                    metadata: Default::default(),
366                })
367                .unwrap();
368        }
369
370        assert_eq!(sequence.count(), 10);
371
372        // Then we don't receive anything until we insert a new flashblock
373        let no_flashblock = subscriber.try_recv();
374        assert!(no_flashblock.is_err());
375
376        // Let's insert a new flashblock with index 0
377        sequence
378            .insert(FlashBlock {
379                payload_id: Default::default(),
380                index: 0,
381                base: Some(ExecutionPayloadBaseV1::default()),
382                diff: Default::default(),
383                metadata: Default::default(),
384            })
385            .unwrap();
386
387        let flashblocks = subscriber.try_recv().unwrap();
388        assert_eq!(flashblocks.count(), 10);
389
390        for (idx, block) in flashblocks.iter().enumerate() {
391            assert_eq!(block.index, idx as u64);
392        }
393    }
394}