reth_optimism_flashblocks/
consensus.rs

1use crate::FlashBlockCompleteSequenceRx;
2use alloy_primitives::B256;
3use reth_engine_primitives::ConsensusEngineHandle;
4use reth_optimism_payload_builder::OpPayloadTypes;
5use reth_payload_primitives::EngineApiMessageVersion;
6use ringbuffer::{AllocRingBuffer, RingBuffer};
7use tracing::warn;
8
9/// Consensus client that sends FCUs and new payloads using blocks from a [`FlashBlockService`]
10///
11/// [`FlashBlockService`]: crate::FlashBlockService
12#[derive(Debug)]
13pub struct FlashBlockConsensusClient {
14    /// Handle to execution client.
15    engine_handle: ConsensusEngineHandle<OpPayloadTypes>,
16    sequence_receiver: FlashBlockCompleteSequenceRx,
17}
18
19impl FlashBlockConsensusClient {
20    /// Create a new `FlashBlockConsensusClient` with the given Op engine and sequence receiver.
21    pub const fn new(
22        engine_handle: ConsensusEngineHandle<OpPayloadTypes>,
23        sequence_receiver: FlashBlockCompleteSequenceRx,
24    ) -> eyre::Result<Self> {
25        Ok(Self { engine_handle, sequence_receiver })
26    }
27
28    /// Get previous block hash using previous block hash buffer. If it isn't available (buffer
29    /// started more recently than `offset`), return default zero hash
30    fn get_previous_block_hash(
31        &self,
32        previous_block_hashes: &AllocRingBuffer<B256>,
33        offset: usize,
34    ) -> B256 {
35        *previous_block_hashes
36            .len()
37            .checked_sub(offset)
38            .and_then(|index| previous_block_hashes.get(index))
39            .unwrap_or_default()
40    }
41
42    /// Spawn the client to start sending FCUs and new payloads by periodically fetching recent
43    /// blocks.
44    pub async fn run(mut self) {
45        let mut previous_block_hashes = AllocRingBuffer::new(64);
46
47        loop {
48            match self.sequence_receiver.recv().await {
49                Ok(sequence) => {
50                    let block_hash = sequence.payload_base().parent_hash;
51                    previous_block_hashes.push(block_hash);
52
53                    if sequence.state_root().is_none() {
54                        warn!("Missing state root for the complete sequence")
55                    }
56
57                    // Load previous block hashes. We're using (head - 32) and (head - 64) as the
58                    // safe and finalized block hashes.
59                    let safe_block_hash = self.get_previous_block_hash(&previous_block_hashes, 32);
60                    let finalized_block_hash =
61                        self.get_previous_block_hash(&previous_block_hashes, 64);
62
63                    let state = alloy_rpc_types_engine::ForkchoiceState {
64                        head_block_hash: block_hash,
65                        safe_block_hash,
66                        finalized_block_hash,
67                    };
68
69                    // Send FCU
70                    let _ = self
71                        .engine_handle
72                        .fork_choice_updated(state, None, EngineApiMessageVersion::V3)
73                        .await;
74                }
75                Err(err) => {
76                    warn!(
77                        target: "consensus::flashblock-client",
78                        %err,
79                        "error while fetching flashblock completed sequence"
80                    );
81                    break;
82                }
83            }
84        }
85    }
86}