reth_optimism_flashblocks/
consensus.rs

1use crate::{FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx};
2use alloy_primitives::B256;
3use alloy_rpc_types_engine::PayloadStatusEnum;
4use op_alloy_rpc_types_engine::OpExecutionData;
5use reth_engine_primitives::ConsensusEngineHandle;
6use reth_optimism_payload_builder::OpPayloadTypes;
7use reth_payload_primitives::{EngineApiMessageVersion, ExecutionPayload, PayloadTypes};
8use tracing::*;
9
10/// Consensus client that sends FCUs and new payloads using blocks from a [`FlashBlockService`].
11///
12/// This client receives completed flashblock sequences and:
13/// - Attempts to submit `engine_newPayload` if `state_root` is available (non-zero)
14/// - Always sends `engine_forkChoiceUpdated` to drive chain forward
15///
16/// [`FlashBlockService`]: crate::FlashBlockService
17#[derive(Debug)]
18pub struct FlashBlockConsensusClient<P = OpPayloadTypes>
19where
20    P: PayloadTypes,
21{
22    /// Handle to execution client.
23    engine_handle: ConsensusEngineHandle<P>,
24    /// Receiver for completed flashblock sequences from `FlashBlockService`.
25    sequence_receiver: FlashBlockCompleteSequenceRx,
26}
27
28impl<P> FlashBlockConsensusClient<P>
29where
30    P: PayloadTypes,
31    P::ExecutionData: for<'a> TryFrom<&'a FlashBlockCompleteSequence, Error: std::fmt::Display>,
32{
33    /// Create a new `FlashBlockConsensusClient` with the given Op engine and sequence receiver.
34    pub const fn new(
35        engine_handle: ConsensusEngineHandle<P>,
36        sequence_receiver: FlashBlockCompleteSequenceRx,
37    ) -> eyre::Result<Self> {
38        Ok(Self { engine_handle, sequence_receiver })
39    }
40
41    /// Attempts to submit a new payload to the engine.
42    ///
43    /// The `TryFrom` conversion will fail if `execution_outcome.state_root` is `B256::ZERO`,
44    /// in which case this returns the `parent_hash` instead to drive the chain forward.
45    ///
46    /// Returns the block hash to use for FCU (either the new block or parent).
47    async fn submit_new_payload(&self, sequence: &FlashBlockCompleteSequence) -> B256 {
48        let payload = match P::ExecutionData::try_from(sequence) {
49            Ok(payload) => payload,
50            Err(err) => {
51                trace!(target: "flashblocks", %err, "Failed payload conversion, using parent hash");
52                return sequence.payload_base().parent_hash;
53            }
54        };
55
56        let block_number = payload.block_number();
57        let block_hash = payload.block_hash();
58        match self.engine_handle.new_payload(payload).await {
59            Ok(result) => {
60                debug!(
61                    target: "flashblocks",
62                    flashblock_count = sequence.count(),
63                    block_number,
64                    %block_hash,
65                    ?result,
66                    "Submitted engine_newPayload",
67                );
68
69                if let PayloadStatusEnum::Invalid { validation_error } = result.status {
70                    debug!(
71                        target: "flashblocks",
72                        block_number,
73                        %block_hash,
74                        %validation_error,
75                        "Payload validation error",
76                    );
77                };
78            }
79            Err(err) => {
80                error!(
81                    target: "flashblocks",
82                    %err,
83                    block_number,
84                    "Failed to submit new payload",
85                );
86            }
87        }
88
89        block_hash
90    }
91
92    /// Submit a forkchoice update to the engine.
93    async fn submit_forkchoice_update(
94        &self,
95        head_block_hash: B256,
96        sequence: &FlashBlockCompleteSequence,
97    ) {
98        let block_number = sequence.block_number();
99        let safe_hash = sequence.payload_base().parent_hash;
100        let finalized_hash = sequence.payload_base().parent_hash;
101        let fcu_state = alloy_rpc_types_engine::ForkchoiceState {
102            head_block_hash,
103            safe_block_hash: safe_hash,
104            finalized_block_hash: finalized_hash,
105        };
106
107        match self
108            .engine_handle
109            .fork_choice_updated(fcu_state, None, EngineApiMessageVersion::V5)
110            .await
111        {
112            Ok(result) => {
113                debug!(
114                    target: "flashblocks",
115                    flashblock_count = sequence.count(),
116                    block_number,
117                    %head_block_hash,
118                    %safe_hash,
119                    %finalized_hash,
120                    ?result,
121                    "Submitted engine_forkChoiceUpdated",
122                )
123            }
124            Err(err) => {
125                error!(
126                    target: "flashblocks",
127                    %err,
128                    block_number,
129                    %head_block_hash,
130                    %safe_hash,
131                    %finalized_hash,
132                    "Failed to submit fork choice update",
133                );
134            }
135        }
136    }
137
138    /// Runs the consensus client loop.
139    ///
140    /// Continuously receives completed flashblock sequences and submits them to the execution
141    /// engine:
142    /// 1. Attempts `engine_newPayload` (only if `state_root` is available)
143    /// 2. Always sends `engine_forkChoiceUpdated` to drive chain forward
144    pub async fn run(mut self) {
145        loop {
146            let Ok(sequence) = self.sequence_receiver.recv().await else {
147                continue;
148            };
149
150            // Returns block_hash for FCU:
151            // - If state_root is available: submits newPayload and returns the new block's hash
152            // - If state_root is zero: skips newPayload and returns parent_hash (no progress yet)
153            let block_hash = self.submit_new_payload(&sequence).await;
154
155            self.submit_forkchoice_update(block_hash, &sequence).await;
156        }
157    }
158}
159
160impl TryFrom<&FlashBlockCompleteSequence> for OpExecutionData {
161    type Error = &'static str;
162
163    fn try_from(sequence: &FlashBlockCompleteSequence) -> Result<Self, Self::Error> {
164        let mut data = Self::from_flashblocks_unchecked(sequence);
165
166        // If execution outcome is available, use the computed state_root and block_hash.
167        // FlashBlockService computes these when building sequences on top of the local tip.
168        if let Some(execution_outcome) = sequence.execution_outcome() {
169            let payload = data.payload.as_v1_mut();
170            payload.state_root = execution_outcome.state_root;
171            payload.block_hash = execution_outcome.block_hash;
172        }
173
174        // Only proceed if we have a valid state_root (non-zero).
175        if data.payload.as_v1_mut().state_root == B256::ZERO {
176            return Err("No state_root available for payload");
177        }
178
179        Ok(data)
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186    use crate::{sequence::SequenceExecutionOutcome, test_utils::TestFlashBlockFactory};
187
188    mod op_execution_data_conversion {
189        use super::*;
190
191        #[test]
192        fn test_try_from_fails_with_zero_state_root() {
193            // When execution_outcome is None, state_root remains zero and conversion fails
194            let factory = TestFlashBlockFactory::new();
195            let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
196
197            let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
198
199            let result = OpExecutionData::try_from(&sequence);
200            assert!(result.is_err());
201            assert_eq!(result.unwrap_err(), "No state_root available for payload");
202        }
203
204        #[test]
205        fn test_try_from_succeeds_with_execution_outcome() {
206            // When execution_outcome has state_root, conversion succeeds
207            let factory = TestFlashBlockFactory::new();
208            let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
209
210            let execution_outcome = SequenceExecutionOutcome {
211                block_hash: B256::random(),
212                state_root: B256::random(), // Non-zero
213            };
214
215            let sequence =
216                FlashBlockCompleteSequence::new(vec![fb0], Some(execution_outcome)).unwrap();
217
218            let result = OpExecutionData::try_from(&sequence);
219            assert!(result.is_ok());
220
221            let mut data = result.unwrap();
222            assert_eq!(data.payload.as_v1_mut().state_root, execution_outcome.state_root);
223            assert_eq!(data.payload.as_v1_mut().block_hash, execution_outcome.block_hash);
224        }
225
226        #[test]
227        fn test_try_from_succeeds_with_provided_state_root() {
228            // When sequencer provides non-zero state_root, conversion succeeds
229            let factory = TestFlashBlockFactory::new();
230            let provided_state_root = B256::random();
231            let fb0 = factory.flashblock_at(0).state_root(provided_state_root).build();
232
233            let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
234
235            let result = OpExecutionData::try_from(&sequence);
236            assert!(result.is_ok());
237
238            let mut data = result.unwrap();
239            assert_eq!(data.payload.as_v1_mut().state_root, provided_state_root);
240        }
241
242        #[test]
243        fn test_try_from_execution_outcome_overrides_provided_state_root() {
244            // execution_outcome takes precedence over sequencer-provided state_root
245            let factory = TestFlashBlockFactory::new();
246            let provided_state_root = B256::random();
247            let fb0 = factory.flashblock_at(0).state_root(provided_state_root).build();
248
249            let execution_outcome = SequenceExecutionOutcome {
250                block_hash: B256::random(),
251                state_root: B256::random(), // Different from provided
252            };
253
254            let sequence =
255                FlashBlockCompleteSequence::new(vec![fb0], Some(execution_outcome)).unwrap();
256
257            let result = OpExecutionData::try_from(&sequence);
258            assert!(result.is_ok());
259
260            let mut data = result.unwrap();
261            // Should use execution_outcome, not the provided state_root
262            assert_eq!(data.payload.as_v1_mut().state_root, execution_outcome.state_root);
263            assert_ne!(data.payload.as_v1_mut().state_root, provided_state_root);
264        }
265
266        #[test]
267        fn test_try_from_with_multiple_flashblocks() {
268            // Test conversion with sequence of multiple flashblocks
269            let factory = TestFlashBlockFactory::new();
270            let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
271            let fb1 = factory.flashblock_after(&fb0).state_root(B256::ZERO).build();
272            let fb2 = factory.flashblock_after(&fb1).state_root(B256::ZERO).build();
273
274            let execution_outcome =
275                SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
276
277            let sequence =
278                FlashBlockCompleteSequence::new(vec![fb0, fb1, fb2], Some(execution_outcome))
279                    .unwrap();
280
281            let result = OpExecutionData::try_from(&sequence);
282            assert!(result.is_ok());
283
284            let mut data = result.unwrap();
285            assert_eq!(data.payload.as_v1_mut().state_root, execution_outcome.state_root);
286            assert_eq!(data.payload.as_v1_mut().block_hash, execution_outcome.block_hash);
287        }
288    }
289
290    mod consensus_client_creation {
291        use super::*;
292        use tokio::sync::broadcast;
293
294        #[test]
295        fn test_new_creates_client() {
296            let (engine_tx, _) = tokio::sync::mpsc::unbounded_channel();
297            let engine_handle = ConsensusEngineHandle::<OpPayloadTypes>::new(engine_tx);
298
299            let (_, sequence_rx) = broadcast::channel(1);
300
301            let result = FlashBlockConsensusClient::new(engine_handle, sequence_rx);
302            assert!(result.is_ok());
303        }
304    }
305
306    mod submit_new_payload_behavior {
307        use super::*;
308
309        #[test]
310        fn test_submit_new_payload_returns_parent_hash_when_no_state_root() {
311            // When conversion fails (no state_root), should return parent_hash
312            let factory = TestFlashBlockFactory::new();
313            let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
314            let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
315
316            let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
317
318            // Verify conversion would fail
319            let conversion_result = OpExecutionData::try_from(&sequence);
320            assert!(conversion_result.is_err());
321
322            // In the actual run loop, submit_new_payload would return parent_hash
323            assert_eq!(sequence.payload_base().parent_hash, parent_hash);
324        }
325
326        #[test]
327        fn test_submit_new_payload_returns_block_hash_when_state_root_available() {
328            // When conversion succeeds, should return the new block's hash
329            let factory = TestFlashBlockFactory::new();
330            let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
331
332            let execution_outcome =
333                SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
334
335            let sequence =
336                FlashBlockCompleteSequence::new(vec![fb0], Some(execution_outcome)).unwrap();
337
338            // Verify conversion succeeds
339            let conversion_result = OpExecutionData::try_from(&sequence);
340            assert!(conversion_result.is_ok());
341
342            let mut data = conversion_result.unwrap();
343            assert_eq!(data.payload.as_v1_mut().block_hash, execution_outcome.block_hash);
344        }
345    }
346
347    mod forkchoice_update_behavior {
348        use super::*;
349
350        #[test]
351        fn test_forkchoice_state_uses_parent_hash_for_safe_and_finalized() {
352            // Both safe_hash and finalized_hash should be set to parent_hash
353            let factory = TestFlashBlockFactory::new();
354            let fb0 = factory.flashblock_at(0).build();
355            let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
356
357            let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
358
359            // Verify the expected forkchoice state
360            assert_eq!(sequence.payload_base().parent_hash, parent_hash);
361        }
362
363        #[test]
364        fn test_forkchoice_update_with_new_block_hash() {
365            // When newPayload succeeds, FCU should use the new block's hash as head
366            let factory = TestFlashBlockFactory::new();
367            let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
368
369            let execution_outcome =
370                SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
371
372            let sequence =
373                FlashBlockCompleteSequence::new(vec![fb0], Some(execution_outcome)).unwrap();
374
375            // The head_block_hash for FCU would be execution_outcome.block_hash
376            assert_eq!(
377                sequence.execution_outcome().unwrap().block_hash,
378                execution_outcome.block_hash
379            );
380        }
381
382        #[test]
383        fn test_forkchoice_update_with_parent_hash_when_no_state_root() {
384            // When newPayload is skipped (no state_root), FCU should use parent_hash as head
385            let factory = TestFlashBlockFactory::new();
386            let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
387            let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
388
389            let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
390
391            // The head_block_hash for FCU would be parent_hash (fallback)
392            assert_eq!(sequence.payload_base().parent_hash, parent_hash);
393        }
394    }
395
396    mod run_loop_logic {
397        use super::*;
398
399        #[test]
400        fn test_run_loop_processes_sequence_with_state_root() {
401            // Scenario: Sequence with state_root should trigger both newPayload and FCU
402            let factory = TestFlashBlockFactory::new();
403            let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
404
405            let execution_outcome =
406                SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
407
408            let sequence =
409                FlashBlockCompleteSequence::new(vec![fb0], Some(execution_outcome)).unwrap();
410
411            // Verify sequence is ready for newPayload
412            let conversion = OpExecutionData::try_from(&sequence);
413            assert!(conversion.is_ok());
414        }
415
416        #[test]
417        fn test_run_loop_processes_sequence_without_state_root() {
418            // Scenario: Sequence without state_root should skip newPayload but still do FCU
419            let factory = TestFlashBlockFactory::new();
420            let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
421
422            let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
423
424            // Verify sequence cannot be converted (newPayload will be skipped)
425            let conversion = OpExecutionData::try_from(&sequence);
426            assert!(conversion.is_err());
427
428            // But FCU should still happen with parent_hash
429            assert!(sequence.payload_base().parent_hash != B256::ZERO);
430        }
431
432        #[test]
433        fn test_run_loop_handles_multiple_sequences() {
434            // Multiple sequences should be processed independently
435            let factory = TestFlashBlockFactory::new();
436
437            // Sequence 1: With state_root
438            let fb0_seq1 = factory.flashblock_at(0).state_root(B256::ZERO).build();
439            let outcome1 =
440                SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
441            let seq1 =
442                FlashBlockCompleteSequence::new(vec![fb0_seq1.clone()], Some(outcome1)).unwrap();
443
444            // Sequence 2: Without state_root (for next block)
445            let fb0_seq2 = factory.flashblock_for_next_block(&fb0_seq1).build();
446            let seq2 = FlashBlockCompleteSequence::new(vec![fb0_seq2], None).unwrap();
447
448            // Both should be valid sequences
449            assert_eq!(seq1.block_number(), 100);
450            assert_eq!(seq2.block_number(), 101);
451
452            // seq1 can be converted
453            assert!(OpExecutionData::try_from(&seq1).is_ok());
454            // seq2 cannot be converted
455            assert!(OpExecutionData::try_from(&seq2).is_err());
456        }
457    }
458}