reth_optimism_flashblocks/
sequence.rs1use crate::{ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx};
2use alloy_eips::eip2718::WithEncoded;
3use alloy_primitives::B256;
4use alloy_rpc_types_engine::PayloadId;
5use core::mem;
6use eyre::{bail, OptionExt};
7use reth_primitives_traits::{Recovered, SignedTransaction};
8use std::{collections::BTreeMap, ops::Deref};
9use tokio::sync::broadcast;
10use tracing::{debug, trace, warn};
11
12const FLASHBLOCK_SEQUENCE_CHANNEL_SIZE: usize = 128;
14
15#[derive(Debug)]
17pub struct FlashBlockPendingSequence<T> {
18 inner: BTreeMap<u64, PreparedFlashBlock<T>>,
23 block_broadcaster: broadcast::Sender<FlashBlockCompleteSequence>,
25 state_root: Option<B256>,
27}
28
29impl<T> FlashBlockPendingSequence<T>
30where
31 T: SignedTransaction,
32{
33 pub fn new() -> Self {
35 let (tx, _) = broadcast::channel(FLASHBLOCK_SEQUENCE_CHANNEL_SIZE);
38 Self { inner: BTreeMap::new(), block_broadcaster: tx, state_root: None }
39 }
40
41 pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx {
43 self.block_broadcaster.subscribe()
44 }
45
46 fn clear_and_broadcast_blocks(&mut self) {
48 let flashblocks = mem::take(&mut self.inner);
49
50 if self.block_broadcaster.receiver_count() > 0 {
52 let flashblocks = match FlashBlockCompleteSequence::new(
53 flashblocks.into_iter().map(|block| block.1.into()).collect(),
54 self.state_root,
55 ) {
56 Ok(flashblocks) => flashblocks,
57 Err(err) => {
58 debug!(target: "flashblocks", error = ?err, "Failed to create full flashblock complete sequence");
59 return;
60 }
61 };
62
63 if let Err(err) = self.block_broadcaster.send(flashblocks) {
67 warn!(target: "flashblocks", error = ?err, "Failed to send flashblocks to subscribers");
68 }
69 }
70 }
71
72 pub fn insert(&mut self, flashblock: FlashBlock) -> eyre::Result<()> {
76 if flashblock.index == 0 {
77 trace!(number=%flashblock.block_number(), "Tracking new flashblock sequence");
78
79 self.clear_and_broadcast_blocks();
81
82 self.inner.insert(flashblock.index, PreparedFlashBlock::new(flashblock)?);
83 return Ok(())
84 }
85
86 let same_block = self.block_number() == Some(flashblock.metadata.block_number);
89 let same_payload = self.payload_id() == Some(flashblock.payload_id);
90
91 if same_block && same_payload {
92 trace!(number=%flashblock.block_number(), index = %flashblock.index, block_count = self.inner.len() ,"Received followup flashblock");
93 self.inner.insert(flashblock.index, PreparedFlashBlock::new(flashblock)?);
94 } else {
95 trace!(number=%flashblock.block_number(), index = %flashblock.index, current=?self.block_number() ,"Ignoring untracked flashblock following");
96 }
97
98 Ok(())
99 }
100
101 pub const fn set_state_root(&mut self, state_root: Option<B256>) {
103 self.state_root = state_root;
104 }
105
106 pub fn ready_transactions(&self) -> impl Iterator<Item = WithEncoded<Recovered<T>>> + '_ {
113 self.inner
114 .values()
115 .enumerate()
116 .take_while(|(idx, block)| {
117 block.block().index == *idx as u64
119 })
120 .flat_map(|(_, block)| block.txs.clone())
121 }
122
123 pub fn block_number(&self) -> Option<u64> {
125 Some(self.inner.values().next()?.block().metadata.block_number)
126 }
127
128 pub fn payload_base(&self) -> Option<ExecutionPayloadBaseV1> {
130 self.inner.values().next()?.block().base.clone()
131 }
132
133 pub fn count(&self) -> usize {
135 self.inner.len()
136 }
137
138 pub fn last_flashblock(&self) -> Option<&FlashBlock> {
140 self.inner.last_key_value().map(|(_, b)| &b.block)
141 }
142
143 pub fn index(&self) -> Option<u64> {
145 Some(self.inner.values().last()?.block().index)
146 }
147 pub fn payload_id(&self) -> Option<PayloadId> {
149 Some(self.inner.values().next()?.block().payload_id)
150 }
151}
152
153impl<T> Default for FlashBlockPendingSequence<T>
154where
155 T: SignedTransaction,
156{
157 fn default() -> Self {
158 Self::new()
159 }
160}
161
162#[derive(Debug, Clone)]
165pub struct FlashBlockCompleteSequence {
166 inner: Vec<FlashBlock>,
167 state_root: Option<B256>,
169}
170
171impl FlashBlockCompleteSequence {
172 pub fn new(blocks: Vec<FlashBlock>, state_root: Option<B256>) -> eyre::Result<Self> {
178 let first_block = blocks.first().ok_or_eyre("No flashblocks in sequence")?;
179
180 first_block.base.as_ref().ok_or_eyre("Flashblock at index 0 has no base")?;
182
183 if !blocks.iter().enumerate().all(|(idx, block)| {
185 idx == block.index as usize &&
186 block.payload_id == first_block.payload_id &&
187 block.metadata.block_number == first_block.metadata.block_number
188 }) {
189 bail!("Flashblock inconsistencies detected in sequence");
190 }
191
192 Ok(Self { inner: blocks, state_root })
193 }
194
195 pub fn block_number(&self) -> u64 {
197 self.inner.first().unwrap().metadata.block_number
198 }
199
200 pub fn payload_base(&self) -> &ExecutionPayloadBaseV1 {
202 self.inner.first().unwrap().base.as_ref().unwrap()
203 }
204
205 pub const fn count(&self) -> usize {
207 self.inner.len()
208 }
209
210 pub fn last(&self) -> &FlashBlock {
212 self.inner.last().unwrap()
213 }
214
215 pub const fn state_root(&self) -> Option<B256> {
217 self.state_root
218 }
219}
220
221impl Deref for FlashBlockCompleteSequence {
222 type Target = Vec<FlashBlock>;
223
224 fn deref(&self) -> &Self::Target {
225 &self.inner
226 }
227}
228
229impl<T> TryFrom<FlashBlockPendingSequence<T>> for FlashBlockCompleteSequence {
230 type Error = eyre::Error;
231 fn try_from(sequence: FlashBlockPendingSequence<T>) -> Result<Self, Self::Error> {
232 Self::new(
233 sequence.inner.into_values().map(|block| block.block().clone()).collect::<Vec<_>>(),
234 sequence.state_root,
235 )
236 }
237}
238
239#[derive(Debug)]
240struct PreparedFlashBlock<T> {
241 txs: Vec<WithEncoded<Recovered<T>>>,
243 block: FlashBlock,
245}
246
247impl<T> PreparedFlashBlock<T> {
248 const fn block(&self) -> &FlashBlock {
249 &self.block
250 }
251}
252
253impl<T> From<PreparedFlashBlock<T>> for FlashBlock {
254 fn from(val: PreparedFlashBlock<T>) -> Self {
255 val.block
256 }
257}
258
259impl<T> PreparedFlashBlock<T>
260where
261 T: SignedTransaction,
262{
263 fn new(block: FlashBlock) -> eyre::Result<Self> {
267 let mut txs = Vec::with_capacity(block.diff.transactions.len());
268 for encoded in block.diff.transactions.iter().cloned() {
269 let tx = T::decode_2718_exact(encoded.as_ref())?;
270 let signer = tx.try_recover()?;
271 let tx = WithEncoded::new(encoded, tx.with_signer(signer));
272 txs.push(tx);
273 }
274
275 Ok(Self { txs, block })
276 }
277}
278
279impl<T> Deref for PreparedFlashBlock<T> {
280 type Target = FlashBlock;
281
282 fn deref(&self) -> &Self::Target {
283 &self.block
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use crate::ExecutionPayloadFlashblockDeltaV1;
291 use alloy_consensus::{
292 transaction::SignerRecoverable, EthereumTxEnvelope, EthereumTypedTransaction, TxEip1559,
293 };
294 use alloy_eips::Encodable2718;
295 use alloy_primitives::{hex, Signature, TxKind, U256};
296
297 #[test]
298 fn test_sequence_stops_before_gap() {
299 let mut sequence = FlashBlockPendingSequence::new();
300 let tx = EthereumTxEnvelope::new_unhashed(
301 EthereumTypedTransaction::<TxEip1559>::Eip1559(TxEip1559 {
302 chain_id: 4,
303 nonce: 26u64,
304 max_priority_fee_per_gas: 1500000000,
305 max_fee_per_gas: 1500000013,
306 gas_limit: 21_000u64,
307 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
308 value: U256::from(3000000000000000000u64),
309 input: Default::default(),
310 access_list: Default::default(),
311 }),
312 Signature::new(
313 U256::from_be_bytes(hex!(
314 "59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd"
315 )),
316 U256::from_be_bytes(hex!(
317 "016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469"
318 )),
319 true,
320 ),
321 );
322 let tx = Recovered::new_unchecked(tx.clone(), tx.recover_signer_unchecked().unwrap());
323
324 sequence
325 .insert(FlashBlock {
326 payload_id: Default::default(),
327 index: 0,
328 base: None,
329 diff: ExecutionPayloadFlashblockDeltaV1 {
330 transactions: vec![tx.encoded_2718().into()],
331 ..Default::default()
332 },
333 metadata: Default::default(),
334 })
335 .unwrap();
336
337 sequence
338 .insert(FlashBlock {
339 payload_id: Default::default(),
340 index: 2,
341 base: None,
342 diff: Default::default(),
343 metadata: Default::default(),
344 })
345 .unwrap();
346
347 let actual_txs: Vec<_> = sequence.ready_transactions().collect();
348 let expected_txs = vec![WithEncoded::new(tx.encoded_2718().into(), tx)];
349
350 assert_eq!(actual_txs, expected_txs);
351 }
352
353 #[test]
354 fn test_sequence_sends_flashblocks_to_subscribers() {
355 let mut sequence = FlashBlockPendingSequence::<EthereumTxEnvelope<TxEip1559>>::new();
356 let mut subscriber = sequence.subscribe_block_sequence();
357
358 for idx in 0..10 {
359 sequence
360 .insert(FlashBlock {
361 payload_id: Default::default(),
362 index: idx,
363 base: Some(ExecutionPayloadBaseV1::default()),
364 diff: Default::default(),
365 metadata: Default::default(),
366 })
367 .unwrap();
368 }
369
370 assert_eq!(sequence.count(), 10);
371
372 let no_flashblock = subscriber.try_recv();
374 assert!(no_flashblock.is_err());
375
376 sequence
378 .insert(FlashBlock {
379 payload_id: Default::default(),
380 index: 0,
381 base: Some(ExecutionPayloadBaseV1::default()),
382 diff: Default::default(),
383 metadata: Default::default(),
384 })
385 .unwrap();
386
387 let flashblocks = subscriber.try_recv().unwrap();
388 assert_eq!(flashblocks.count(), 10);
389
390 for (idx, block) in flashblocks.iter().enumerate() {
391 assert_eq!(block.index, idx as u64);
392 }
393 }
394}