reth_optimism_flashblocks/
consensus.rs1use crate::{FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx};
2use alloy_primitives::B256;
3use alloy_rpc_types_engine::PayloadStatusEnum;
4use op_alloy_rpc_types_engine::OpExecutionData;
5use reth_engine_primitives::ConsensusEngineHandle;
6use reth_optimism_payload_builder::OpPayloadTypes;
7use reth_payload_primitives::{EngineApiMessageVersion, ExecutionPayload, PayloadTypes};
8use tracing::*;
9
10#[derive(Debug)]
18pub struct FlashBlockConsensusClient<P = OpPayloadTypes>
19where
20 P: PayloadTypes,
21{
22 engine_handle: ConsensusEngineHandle<P>,
24 sequence_receiver: FlashBlockCompleteSequenceRx,
26}
27
28impl<P> FlashBlockConsensusClient<P>
29where
30 P: PayloadTypes,
31 P::ExecutionData: for<'a> TryFrom<&'a FlashBlockCompleteSequence, Error: std::fmt::Display>,
32{
33 pub const fn new(
35 engine_handle: ConsensusEngineHandle<P>,
36 sequence_receiver: FlashBlockCompleteSequenceRx,
37 ) -> eyre::Result<Self> {
38 Ok(Self { engine_handle, sequence_receiver })
39 }
40
41 async fn submit_new_payload(&self, sequence: &FlashBlockCompleteSequence) -> B256 {
48 let payload = match P::ExecutionData::try_from(sequence) {
49 Ok(payload) => payload,
50 Err(err) => {
51 trace!(target: "flashblocks", %err, "Failed payload conversion, using parent hash");
52 return sequence.payload_base().parent_hash;
53 }
54 };
55
56 let block_number = payload.block_number();
57 let block_hash = payload.block_hash();
58 match self.engine_handle.new_payload(payload).await {
59 Ok(result) => {
60 debug!(
61 target: "flashblocks",
62 flashblock_count = sequence.count(),
63 block_number,
64 %block_hash,
65 ?result,
66 "Submitted engine_newPayload",
67 );
68
69 if let PayloadStatusEnum::Invalid { validation_error } = result.status {
70 debug!(
71 target: "flashblocks",
72 block_number,
73 %block_hash,
74 %validation_error,
75 "Payload validation error",
76 );
77 };
78 }
79 Err(err) => {
80 error!(
81 target: "flashblocks",
82 %err,
83 block_number,
84 "Failed to submit new payload",
85 );
86 }
87 }
88
89 block_hash
90 }
91
92 async fn submit_forkchoice_update(
94 &self,
95 head_block_hash: B256,
96 sequence: &FlashBlockCompleteSequence,
97 ) {
98 let block_number = sequence.block_number();
99 let safe_hash = sequence.payload_base().parent_hash;
100 let finalized_hash = sequence.payload_base().parent_hash;
101 let fcu_state = alloy_rpc_types_engine::ForkchoiceState {
102 head_block_hash,
103 safe_block_hash: safe_hash,
104 finalized_block_hash: finalized_hash,
105 };
106
107 match self
108 .engine_handle
109 .fork_choice_updated(fcu_state, None, EngineApiMessageVersion::V5)
110 .await
111 {
112 Ok(result) => {
113 debug!(
114 target: "flashblocks",
115 flashblock_count = sequence.count(),
116 block_number,
117 %head_block_hash,
118 %safe_hash,
119 %finalized_hash,
120 ?result,
121 "Submitted engine_forkChoiceUpdated",
122 )
123 }
124 Err(err) => {
125 error!(
126 target: "flashblocks",
127 %err,
128 block_number,
129 %head_block_hash,
130 %safe_hash,
131 %finalized_hash,
132 "Failed to submit fork choice update",
133 );
134 }
135 }
136 }
137
138 pub async fn run(mut self) {
145 loop {
146 let Ok(sequence) = self.sequence_receiver.recv().await else {
147 continue;
148 };
149
150 let block_hash = self.submit_new_payload(&sequence).await;
154
155 self.submit_forkchoice_update(block_hash, &sequence).await;
156 }
157 }
158}
159
160impl TryFrom<&FlashBlockCompleteSequence> for OpExecutionData {
161 type Error = &'static str;
162
163 fn try_from(sequence: &FlashBlockCompleteSequence) -> Result<Self, Self::Error> {
164 let mut data = Self::from_flashblocks_unchecked(sequence);
165
166 if let Some(execution_outcome) = sequence.execution_outcome() {
169 let payload = data.payload.as_v1_mut();
170 payload.state_root = execution_outcome.state_root;
171 payload.block_hash = execution_outcome.block_hash;
172 }
173
174 if data.payload.as_v1_mut().state_root == B256::ZERO {
176 return Err("No state_root available for payload");
177 }
178
179 Ok(data)
180 }
181}
182
183#[cfg(test)]
184mod tests {
185 use super::*;
186 use crate::{sequence::SequenceExecutionOutcome, test_utils::TestFlashBlockFactory};
187
188 mod op_execution_data_conversion {
189 use super::*;
190
191 #[test]
192 fn test_try_from_fails_with_zero_state_root() {
193 let factory = TestFlashBlockFactory::new();
195 let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
196
197 let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
198
199 let result = OpExecutionData::try_from(&sequence);
200 assert!(result.is_err());
201 assert_eq!(result.unwrap_err(), "No state_root available for payload");
202 }
203
204 #[test]
205 fn test_try_from_succeeds_with_execution_outcome() {
206 let factory = TestFlashBlockFactory::new();
208 let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
209
210 let execution_outcome = SequenceExecutionOutcome {
211 block_hash: B256::random(),
212 state_root: B256::random(), };
214
215 let sequence =
216 FlashBlockCompleteSequence::new(vec![fb0], Some(execution_outcome)).unwrap();
217
218 let result = OpExecutionData::try_from(&sequence);
219 assert!(result.is_ok());
220
221 let mut data = result.unwrap();
222 assert_eq!(data.payload.as_v1_mut().state_root, execution_outcome.state_root);
223 assert_eq!(data.payload.as_v1_mut().block_hash, execution_outcome.block_hash);
224 }
225
226 #[test]
227 fn test_try_from_succeeds_with_provided_state_root() {
228 let factory = TestFlashBlockFactory::new();
230 let provided_state_root = B256::random();
231 let fb0 = factory.flashblock_at(0).state_root(provided_state_root).build();
232
233 let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
234
235 let result = OpExecutionData::try_from(&sequence);
236 assert!(result.is_ok());
237
238 let mut data = result.unwrap();
239 assert_eq!(data.payload.as_v1_mut().state_root, provided_state_root);
240 }
241
242 #[test]
243 fn test_try_from_execution_outcome_overrides_provided_state_root() {
244 let factory = TestFlashBlockFactory::new();
246 let provided_state_root = B256::random();
247 let fb0 = factory.flashblock_at(0).state_root(provided_state_root).build();
248
249 let execution_outcome = SequenceExecutionOutcome {
250 block_hash: B256::random(),
251 state_root: B256::random(), };
253
254 let sequence =
255 FlashBlockCompleteSequence::new(vec![fb0], Some(execution_outcome)).unwrap();
256
257 let result = OpExecutionData::try_from(&sequence);
258 assert!(result.is_ok());
259
260 let mut data = result.unwrap();
261 assert_eq!(data.payload.as_v1_mut().state_root, execution_outcome.state_root);
263 assert_ne!(data.payload.as_v1_mut().state_root, provided_state_root);
264 }
265
266 #[test]
267 fn test_try_from_with_multiple_flashblocks() {
268 let factory = TestFlashBlockFactory::new();
270 let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
271 let fb1 = factory.flashblock_after(&fb0).state_root(B256::ZERO).build();
272 let fb2 = factory.flashblock_after(&fb1).state_root(B256::ZERO).build();
273
274 let execution_outcome =
275 SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
276
277 let sequence =
278 FlashBlockCompleteSequence::new(vec![fb0, fb1, fb2], Some(execution_outcome))
279 .unwrap();
280
281 let result = OpExecutionData::try_from(&sequence);
282 assert!(result.is_ok());
283
284 let mut data = result.unwrap();
285 assert_eq!(data.payload.as_v1_mut().state_root, execution_outcome.state_root);
286 assert_eq!(data.payload.as_v1_mut().block_hash, execution_outcome.block_hash);
287 }
288 }
289
290 mod consensus_client_creation {
291 use super::*;
292 use tokio::sync::broadcast;
293
294 #[test]
295 fn test_new_creates_client() {
296 let (engine_tx, _) = tokio::sync::mpsc::unbounded_channel();
297 let engine_handle = ConsensusEngineHandle::<OpPayloadTypes>::new(engine_tx);
298
299 let (_, sequence_rx) = broadcast::channel(1);
300
301 let result = FlashBlockConsensusClient::new(engine_handle, sequence_rx);
302 assert!(result.is_ok());
303 }
304 }
305
306 mod submit_new_payload_behavior {
307 use super::*;
308
309 #[test]
310 fn test_submit_new_payload_returns_parent_hash_when_no_state_root() {
311 let factory = TestFlashBlockFactory::new();
313 let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
314 let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
315
316 let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
317
318 let conversion_result = OpExecutionData::try_from(&sequence);
320 assert!(conversion_result.is_err());
321
322 assert_eq!(sequence.payload_base().parent_hash, parent_hash);
324 }
325
326 #[test]
327 fn test_submit_new_payload_returns_block_hash_when_state_root_available() {
328 let factory = TestFlashBlockFactory::new();
330 let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
331
332 let execution_outcome =
333 SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
334
335 let sequence =
336 FlashBlockCompleteSequence::new(vec![fb0], Some(execution_outcome)).unwrap();
337
338 let conversion_result = OpExecutionData::try_from(&sequence);
340 assert!(conversion_result.is_ok());
341
342 let mut data = conversion_result.unwrap();
343 assert_eq!(data.payload.as_v1_mut().block_hash, execution_outcome.block_hash);
344 }
345 }
346
347 mod forkchoice_update_behavior {
348 use super::*;
349
350 #[test]
351 fn test_forkchoice_state_uses_parent_hash_for_safe_and_finalized() {
352 let factory = TestFlashBlockFactory::new();
354 let fb0 = factory.flashblock_at(0).build();
355 let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
356
357 let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
358
359 assert_eq!(sequence.payload_base().parent_hash, parent_hash);
361 }
362
363 #[test]
364 fn test_forkchoice_update_with_new_block_hash() {
365 let factory = TestFlashBlockFactory::new();
367 let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
368
369 let execution_outcome =
370 SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
371
372 let sequence =
373 FlashBlockCompleteSequence::new(vec![fb0], Some(execution_outcome)).unwrap();
374
375 assert_eq!(
377 sequence.execution_outcome().unwrap().block_hash,
378 execution_outcome.block_hash
379 );
380 }
381
382 #[test]
383 fn test_forkchoice_update_with_parent_hash_when_no_state_root() {
384 let factory = TestFlashBlockFactory::new();
386 let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
387 let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
388
389 let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
390
391 assert_eq!(sequence.payload_base().parent_hash, parent_hash);
393 }
394 }
395
396 mod run_loop_logic {
397 use super::*;
398
399 #[test]
400 fn test_run_loop_processes_sequence_with_state_root() {
401 let factory = TestFlashBlockFactory::new();
403 let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
404
405 let execution_outcome =
406 SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
407
408 let sequence =
409 FlashBlockCompleteSequence::new(vec![fb0], Some(execution_outcome)).unwrap();
410
411 let conversion = OpExecutionData::try_from(&sequence);
413 assert!(conversion.is_ok());
414 }
415
416 #[test]
417 fn test_run_loop_processes_sequence_without_state_root() {
418 let factory = TestFlashBlockFactory::new();
420 let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
421
422 let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
423
424 let conversion = OpExecutionData::try_from(&sequence);
426 assert!(conversion.is_err());
427
428 assert!(sequence.payload_base().parent_hash != B256::ZERO);
430 }
431
432 #[test]
433 fn test_run_loop_handles_multiple_sequences() {
434 let factory = TestFlashBlockFactory::new();
436
437 let fb0_seq1 = factory.flashblock_at(0).state_root(B256::ZERO).build();
439 let outcome1 =
440 SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
441 let seq1 =
442 FlashBlockCompleteSequence::new(vec![fb0_seq1.clone()], Some(outcome1)).unwrap();
443
444 let fb0_seq2 = factory.flashblock_for_next_block(&fb0_seq1).build();
446 let seq2 = FlashBlockCompleteSequence::new(vec![fb0_seq2], None).unwrap();
447
448 assert_eq!(seq1.block_number(), 100);
450 assert_eq!(seq2.block_number(), 101);
451
452 assert!(OpExecutionData::try_from(&seq1).is_ok());
454 assert!(OpExecutionData::try_from(&seq2).is_err());
456 }
457 }
458}