reth_optimism_flashblocks/
consensus.rs

1use crate::FlashBlockCompleteSequenceRx;
2use alloy_primitives::B256;
3use reth_node_api::{ConsensusEngineHandle, EngineApiMessageVersion};
4use reth_optimism_payload_builder::OpPayloadTypes;
5use ringbuffer::{AllocRingBuffer, RingBuffer};
6use tracing::warn;
7
8/// Consensus client that sends FCUs and new payloads using blocks from a [`FlashBlockService`]
9///
10/// [`FlashBlockService`]: crate::FlashBlockService
11#[derive(Debug)]
12pub struct FlashBlockConsensusClient {
13    /// Handle to execution client.
14    engine_handle: ConsensusEngineHandle<OpPayloadTypes>,
15    sequence_receiver: FlashBlockCompleteSequenceRx,
16}
17
18impl FlashBlockConsensusClient {
19    /// Create a new `FlashBlockConsensusClient` with the given Op engine and sequence receiver.
20    pub const fn new(
21        engine_handle: ConsensusEngineHandle<OpPayloadTypes>,
22        sequence_receiver: FlashBlockCompleteSequenceRx,
23    ) -> eyre::Result<Self> {
24        Ok(Self { engine_handle, sequence_receiver })
25    }
26
27    /// Get previous block hash using previous block hash buffer. If it isn't available (buffer
28    /// started more recently than `offset`), return default zero hash
29    fn get_previous_block_hash(
30        &self,
31        previous_block_hashes: &AllocRingBuffer<B256>,
32        offset: usize,
33    ) -> B256 {
34        *previous_block_hashes
35            .len()
36            .checked_sub(offset)
37            .and_then(|index| previous_block_hashes.get(index))
38            .unwrap_or_default()
39    }
40
41    /// Spawn the client to start sending FCUs and new payloads by periodically fetching recent
42    /// blocks.
43    pub async fn run(mut self) {
44        let mut previous_block_hashes = AllocRingBuffer::new(64);
45
46        loop {
47            match self.sequence_receiver.recv().await {
48                Ok(sequence) => {
49                    let block_hash = sequence.payload_base().parent_hash;
50                    previous_block_hashes.push(block_hash);
51
52                    if sequence.state_root().is_none() {
53                        warn!("Missing state root for the complete sequence")
54                    }
55
56                    // Load previous block hashes. We're using (head - 32) and (head - 64) as the
57                    // safe and finalized block hashes.
58                    let safe_block_hash = self.get_previous_block_hash(&previous_block_hashes, 32);
59                    let finalized_block_hash =
60                        self.get_previous_block_hash(&previous_block_hashes, 64);
61
62                    let state = alloy_rpc_types_engine::ForkchoiceState {
63                        head_block_hash: block_hash,
64                        safe_block_hash,
65                        finalized_block_hash,
66                    };
67
68                    // Send FCU
69                    let _ = self
70                        .engine_handle
71                        .fork_choice_updated(state, None, EngineApiMessageVersion::V3)
72                        .await;
73                }
74                Err(err) => {
75                    warn!(
76                        target: "consensus::flashblock-client",
77                        %err,
78                        "error while fetching flashblock completed sequence"
79                    );
80                    break;
81                }
82            }
83        }
84    }
85}