reth_engine_local/
miner.rs

1//! Contains the implementation of the mining mode for the local engine.
2
3use alloy_consensus::BlockHeader;
4use alloy_primitives::{TxHash, B256};
5use alloy_rpc_types_engine::ForkchoiceState;
6use eyre::OptionExt;
7use futures_util::{stream::Fuse, StreamExt};
8use reth_engine_primitives::{BeaconEngineMessage, EngineTypes};
9use reth_payload_builder::PayloadBuilderHandle;
10use reth_payload_primitives::{
11    BuiltPayload, EngineApiMessageVersion, PayloadAttributesBuilder, PayloadKind, PayloadTypes,
12};
13use reth_provider::BlockReader;
14use reth_transaction_pool::TransactionPool;
15use std::{
16    future::Future,
17    pin::Pin,
18    task::{Context, Poll},
19    time::{Duration, UNIX_EPOCH},
20};
21use tokio::{
22    sync::{mpsc::UnboundedSender, oneshot},
23    time::Interval,
24};
25use tokio_stream::wrappers::ReceiverStream;
26use tracing::error;
27
28/// A mining mode for the local dev engine.
29#[derive(Debug)]
30pub enum MiningMode {
31    /// In this mode a block is built as soon as
32    /// a valid transaction reaches the pool.
33    Instant(Fuse<ReceiverStream<TxHash>>),
34    /// In this mode a block is built at a fixed interval.
35    Interval(Interval),
36}
37
38impl MiningMode {
39    /// Constructor for a [`MiningMode::Instant`]
40    pub fn instant<Pool: TransactionPool>(pool: Pool) -> Self {
41        let rx = pool.pending_transactions_listener();
42        Self::Instant(ReceiverStream::new(rx).fuse())
43    }
44
45    /// Constructor for a [`MiningMode::Interval`]
46    pub fn interval(duration: Duration) -> Self {
47        let start = tokio::time::Instant::now() + duration;
48        Self::Interval(tokio::time::interval_at(start, duration))
49    }
50}
51
52impl Future for MiningMode {
53    type Output = ();
54
55    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
56        let this = self.get_mut();
57        match this {
58            Self::Instant(rx) => {
59                // drain all transactions notifications
60                if let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
61                    return Poll::Ready(())
62                }
63                Poll::Pending
64            }
65            Self::Interval(interval) => {
66                if interval.poll_tick(cx).is_ready() {
67                    return Poll::Ready(())
68                }
69                Poll::Pending
70            }
71        }
72    }
73}
74
75/// Local miner advancing the chain/
76#[derive(Debug)]
77pub struct LocalMiner<EngineT: EngineTypes, B> {
78    /// The payload attribute builder for the engine
79    payload_attributes_builder: B,
80    /// Sender for events to engine.
81    to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
82    /// The mining mode for the engine
83    mode: MiningMode,
84    /// The payload builder for the engine
85    payload_builder: PayloadBuilderHandle<EngineT>,
86    /// Timestamp for the next block.
87    last_timestamp: u64,
88    /// Stores latest mined blocks.
89    last_block_hashes: Vec<B256>,
90}
91
92impl<EngineT, B> LocalMiner<EngineT, B>
93where
94    EngineT: EngineTypes,
95    B: PayloadAttributesBuilder<<EngineT as PayloadTypes>::PayloadAttributes>,
96{
97    /// Spawns a new [`LocalMiner`] with the given parameters.
98    pub fn spawn_new(
99        provider: impl BlockReader,
100        payload_attributes_builder: B,
101        to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
102        mode: MiningMode,
103        payload_builder: PayloadBuilderHandle<EngineT>,
104    ) {
105        let latest_header =
106            provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
107
108        let miner = Self {
109            payload_attributes_builder,
110            to_engine,
111            mode,
112            payload_builder,
113            last_timestamp: latest_header.timestamp(),
114            last_block_hashes: vec![latest_header.hash()],
115        };
116
117        // Spawn the miner
118        tokio::spawn(miner.run());
119    }
120
121    /// Runs the [`LocalMiner`] in a loop, polling the miner and building payloads.
122    async fn run(mut self) {
123        let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
124        loop {
125            tokio::select! {
126                // Wait for the interval or the pool to receive a transaction
127                _ = &mut self.mode => {
128                    if let Err(e) = self.advance().await {
129                        error!(target: "engine::local", "Error advancing the chain: {:?}", e);
130                    }
131                }
132                // send FCU once in a while
133                _ = fcu_interval.tick() => {
134                    if let Err(e) = self.update_forkchoice_state().await {
135                        error!(target: "engine::local", "Error updating fork choice: {:?}", e);
136                    }
137                }
138            }
139        }
140    }
141
142    /// Returns current forkchoice state.
143    fn forkchoice_state(&self) -> ForkchoiceState {
144        ForkchoiceState {
145            head_block_hash: *self.last_block_hashes.last().expect("at least 1 block exists"),
146            safe_block_hash: *self
147                .last_block_hashes
148                .get(self.last_block_hashes.len().saturating_sub(32))
149                .expect("at least 1 block exists"),
150            finalized_block_hash: *self
151                .last_block_hashes
152                .get(self.last_block_hashes.len().saturating_sub(64))
153                .expect("at least 1 block exists"),
154        }
155    }
156
157    /// Sends a FCU to the engine.
158    async fn update_forkchoice_state(&self) -> eyre::Result<()> {
159        let (tx, rx) = oneshot::channel();
160        self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
161            state: self.forkchoice_state(),
162            payload_attrs: None,
163            tx,
164            version: EngineApiMessageVersion::default(),
165        })?;
166
167        let res = rx.await??;
168        if !res.forkchoice_status().is_valid() {
169            eyre::bail!("Invalid fork choice update")
170        }
171
172        Ok(())
173    }
174
175    /// Generates payload attributes for a new block, passes them to FCU and inserts built payload
176    /// through newPayload.
177    async fn advance(&mut self) -> eyre::Result<()> {
178        let timestamp = std::cmp::max(
179            self.last_timestamp + 1,
180            std::time::SystemTime::now()
181                .duration_since(UNIX_EPOCH)
182                .expect("cannot be earlier than UNIX_EPOCH")
183                .as_secs(),
184        );
185
186        let (tx, rx) = oneshot::channel();
187        self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
188            state: self.forkchoice_state(),
189            payload_attrs: Some(self.payload_attributes_builder.build(timestamp)),
190            tx,
191            version: EngineApiMessageVersion::default(),
192        })?;
193
194        let res = rx.await??.await?;
195        if !res.payload_status.is_valid() {
196            eyre::bail!("Invalid payload status")
197        }
198
199        let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
200
201        let Some(Ok(payload)) =
202            self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
203        else {
204            eyre::bail!("No payload")
205        };
206
207        let block = payload.block();
208
209        let (tx, rx) = oneshot::channel();
210        let payload = EngineT::block_to_payload(payload.block().clone());
211        self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx })?;
212
213        let res = rx.await??;
214
215        if !res.is_valid() {
216            eyre::bail!("Invalid payload")
217        }
218
219        self.last_timestamp = timestamp;
220        self.last_block_hashes.push(block.hash());
221        // ensure we keep at most 64 blocks
222        if self.last_block_hashes.len() > 64 {
223            self.last_block_hashes =
224                self.last_block_hashes.split_off(self.last_block_hashes.len() - 64);
225        }
226
227        Ok(())
228    }
229}