reth_optimism_flashblocks/
consensus.rs1use crate::FlashBlockCompleteSequenceRx;
2use alloy_primitives::B256;
3use reth_node_api::{ConsensusEngineHandle, EngineApiMessageVersion};
4use reth_optimism_payload_builder::OpPayloadTypes;
5use ringbuffer::{AllocRingBuffer, RingBuffer};
6use tracing::warn;
7
8#[derive(Debug)]
12pub struct FlashBlockConsensusClient {
13 engine_handle: ConsensusEngineHandle<OpPayloadTypes>,
15 sequence_receiver: FlashBlockCompleteSequenceRx,
16}
17
18impl FlashBlockConsensusClient {
19 pub const fn new(
21 engine_handle: ConsensusEngineHandle<OpPayloadTypes>,
22 sequence_receiver: FlashBlockCompleteSequenceRx,
23 ) -> eyre::Result<Self> {
24 Ok(Self { engine_handle, sequence_receiver })
25 }
26
27 fn get_previous_block_hash(
30 &self,
31 previous_block_hashes: &AllocRingBuffer<B256>,
32 offset: usize,
33 ) -> B256 {
34 *previous_block_hashes
35 .len()
36 .checked_sub(offset)
37 .and_then(|index| previous_block_hashes.get(index))
38 .unwrap_or_default()
39 }
40
41 pub async fn run(mut self) {
44 let mut previous_block_hashes = AllocRingBuffer::new(64);
45
46 loop {
47 match self.sequence_receiver.recv().await {
48 Ok(sequence) => {
49 let block_hash = sequence.payload_base().parent_hash;
50 previous_block_hashes.push(block_hash);
51
52 if sequence.state_root().is_none() {
53 warn!("Missing state root for the complete sequence")
54 }
55
56 let safe_block_hash = self.get_previous_block_hash(&previous_block_hashes, 32);
59 let finalized_block_hash =
60 self.get_previous_block_hash(&previous_block_hashes, 64);
61
62 let state = alloy_rpc_types_engine::ForkchoiceState {
63 head_block_hash: block_hash,
64 safe_block_hash,
65 finalized_block_hash,
66 };
67
68 let _ = self
70 .engine_handle
71 .fork_choice_updated(state, None, EngineApiMessageVersion::V3)
72 .await;
73 }
74 Err(err) => {
75 warn!(
76 target: "consensus::flashblock-client",
77 %err,
78 "error while fetching flashblock completed sequence"
79 );
80 break;
81 }
82 }
83 }
84 }
85}