reth_optimism_flashblocks/
consensus.rs1use crate::FlashBlockCompleteSequenceRx;
2use alloy_primitives::B256;
3use reth_engine_primitives::ConsensusEngineHandle;
4use reth_optimism_payload_builder::OpPayloadTypes;
5use reth_payload_primitives::EngineApiMessageVersion;
6use ringbuffer::{AllocRingBuffer, RingBuffer};
7use tracing::warn;
8
9#[derive(Debug)]
13pub struct FlashBlockConsensusClient {
14 engine_handle: ConsensusEngineHandle<OpPayloadTypes>,
16 sequence_receiver: FlashBlockCompleteSequenceRx,
17}
18
19impl FlashBlockConsensusClient {
20 pub const fn new(
22 engine_handle: ConsensusEngineHandle<OpPayloadTypes>,
23 sequence_receiver: FlashBlockCompleteSequenceRx,
24 ) -> eyre::Result<Self> {
25 Ok(Self { engine_handle, sequence_receiver })
26 }
27
28 fn get_previous_block_hash(
31 &self,
32 previous_block_hashes: &AllocRingBuffer<B256>,
33 offset: usize,
34 ) -> B256 {
35 *previous_block_hashes
36 .len()
37 .checked_sub(offset)
38 .and_then(|index| previous_block_hashes.get(index))
39 .unwrap_or_default()
40 }
41
42 pub async fn run(mut self) {
45 let mut previous_block_hashes = AllocRingBuffer::new(64);
46
47 loop {
48 match self.sequence_receiver.recv().await {
49 Ok(sequence) => {
50 let block_hash = sequence.payload_base().parent_hash;
51 previous_block_hashes.push(block_hash);
52
53 if sequence.state_root().is_none() {
54 warn!("Missing state root for the complete sequence")
55 }
56
57 let safe_block_hash = self.get_previous_block_hash(&previous_block_hashes, 32);
60 let finalized_block_hash =
61 self.get_previous_block_hash(&previous_block_hashes, 64);
62
63 let state = alloy_rpc_types_engine::ForkchoiceState {
64 head_block_hash: block_hash,
65 safe_block_hash,
66 finalized_block_hash,
67 };
68
69 let _ = self
71 .engine_handle
72 .fork_choice_updated(state, None, EngineApiMessageVersion::V3)
73 .await;
74 }
75 Err(err) => {
76 warn!(
77 target: "consensus::flashblock-client",
78 %err,
79 "error while fetching flashblock completed sequence"
80 );
81 break;
82 }
83 }
84 }
85 }
86}