reth_engine_local/
miner.rs

1//! Contains the implementation of the mining mode for the local engine.
2
3use alloy_consensus::BlockHeader;
4use alloy_primitives::{TxHash, B256};
5use alloy_rpc_types_engine::ForkchoiceState;
6use eyre::OptionExt;
7use futures_util::{stream::Fuse, StreamExt};
8use reth_engine_primitives::ConsensusEngineHandle;
9use reth_payload_builder::PayloadBuilderHandle;
10use reth_payload_primitives::{
11    BuiltPayload, EngineApiMessageVersion, PayloadAttributesBuilder, PayloadKind, PayloadTypes,
12};
13use reth_provider::BlockReader;
14use reth_transaction_pool::TransactionPool;
15use std::{
16    future::Future,
17    pin::Pin,
18    task::{Context, Poll},
19    time::{Duration, UNIX_EPOCH},
20};
21use tokio::time::Interval;
22use tokio_stream::wrappers::ReceiverStream;
23use tracing::error;
24
25/// A mining mode for the local dev engine.
26#[derive(Debug)]
27pub enum MiningMode<Pool: TransactionPool + Unpin> {
28    /// In this mode a block is built as soon as
29    /// a valid transaction reaches the pool.
30    /// If `max_transactions` is set, a block is built when that many transactions have
31    /// accumulated.
32    Instant {
33        /// The transaction pool.
34        pool: Pool,
35        /// Stream of transaction notifications.
36        rx: Fuse<ReceiverStream<TxHash>>,
37        /// Maximum number of transactions to accumulate before mining a block.
38        /// If None, mine immediately when any transaction arrives.
39        max_transactions: Option<usize>,
40        /// Counter for accumulated transactions (only used when `max_transactions` is set).
41        accumulated: usize,
42    },
43    /// In this mode a block is built at a fixed interval.
44    Interval(Interval),
45}
46
47impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
48    /// Constructor for a [`MiningMode::Instant`]
49    pub fn instant(pool: Pool, max_transactions: Option<usize>) -> Self {
50        let rx = pool.pending_transactions_listener();
51        Self::Instant { pool, rx: ReceiverStream::new(rx).fuse(), max_transactions, accumulated: 0 }
52    }
53
54    /// Constructor for a [`MiningMode::Interval`]
55    pub fn interval(duration: Duration) -> Self {
56        let start = tokio::time::Instant::now() + duration;
57        Self::Interval(tokio::time::interval_at(start, duration))
58    }
59}
60
61impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
62    type Output = ();
63
64    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
65        let this = self.get_mut();
66        match this {
67            Self::Instant { pool, rx, max_transactions, accumulated } => {
68                // Poll for new transaction notifications
69                while let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
70                    if pool.pending_and_queued_txn_count().0 == 0 {
71                        continue;
72                    }
73                    if let Some(max_tx) = max_transactions {
74                        *accumulated += 1;
75                        // If we've reached the max transactions threshold, mine a block
76                        if *accumulated >= *max_tx {
77                            *accumulated = 0; // Reset counter for next block
78                            return Poll::Ready(());
79                        }
80                    } else {
81                        // If no max_transactions is set, mine immediately
82                        return Poll::Ready(());
83                    }
84                }
85                Poll::Pending
86            }
87            Self::Interval(interval) => {
88                if interval.poll_tick(cx).is_ready() {
89                    return Poll::Ready(())
90                }
91                Poll::Pending
92            }
93        }
94    }
95}
96
97/// Local miner advancing the chain
98#[derive(Debug)]
99pub struct LocalMiner<T: PayloadTypes, B, Pool: TransactionPool + Unpin> {
100    /// The payload attribute builder for the engine
101    payload_attributes_builder: B,
102    /// Sender for events to engine.
103    to_engine: ConsensusEngineHandle<T>,
104    /// The mining mode for the engine
105    mode: MiningMode<Pool>,
106    /// The payload builder for the engine
107    payload_builder: PayloadBuilderHandle<T>,
108    /// Timestamp for the next block.
109    last_timestamp: u64,
110    /// Stores latest mined blocks.
111    last_block_hashes: Vec<B256>,
112}
113
114impl<T, B, Pool> LocalMiner<T, B, Pool>
115where
116    T: PayloadTypes,
117    B: PayloadAttributesBuilder<<T as PayloadTypes>::PayloadAttributes>,
118    Pool: TransactionPool + Unpin,
119{
120    /// Spawns a new [`LocalMiner`] with the given parameters.
121    pub fn new(
122        provider: impl BlockReader,
123        payload_attributes_builder: B,
124        to_engine: ConsensusEngineHandle<T>,
125        mode: MiningMode<Pool>,
126        payload_builder: PayloadBuilderHandle<T>,
127    ) -> Self {
128        let latest_header =
129            provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
130
131        Self {
132            payload_attributes_builder,
133            to_engine,
134            mode,
135            payload_builder,
136            last_timestamp: latest_header.timestamp(),
137            last_block_hashes: vec![latest_header.hash()],
138        }
139    }
140
141    /// Runs the [`LocalMiner`] in a loop, polling the miner and building payloads.
142    pub async fn run(mut self) {
143        let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
144        loop {
145            tokio::select! {
146                // Wait for the interval or the pool to receive a transaction
147                _ = &mut self.mode => {
148                    if let Err(e) = self.advance().await {
149                        error!(target: "engine::local", "Error advancing the chain: {:?}", e);
150                    }
151                }
152                // send FCU once in a while
153                _ = fcu_interval.tick() => {
154                    if let Err(e) = self.update_forkchoice_state().await {
155                        error!(target: "engine::local", "Error updating fork choice: {:?}", e);
156                    }
157                }
158            }
159        }
160    }
161
162    /// Returns current forkchoice state.
163    fn forkchoice_state(&self) -> ForkchoiceState {
164        ForkchoiceState {
165            head_block_hash: *self.last_block_hashes.last().expect("at least 1 block exists"),
166            safe_block_hash: *self
167                .last_block_hashes
168                .get(self.last_block_hashes.len().saturating_sub(32))
169                .expect("at least 1 block exists"),
170            finalized_block_hash: *self
171                .last_block_hashes
172                .get(self.last_block_hashes.len().saturating_sub(64))
173                .expect("at least 1 block exists"),
174        }
175    }
176
177    /// Sends a FCU to the engine.
178    async fn update_forkchoice_state(&self) -> eyre::Result<()> {
179        let res = self
180            .to_engine
181            .fork_choice_updated(self.forkchoice_state(), None, EngineApiMessageVersion::default())
182            .await?;
183
184        if !res.is_valid() {
185            eyre::bail!("Invalid fork choice update")
186        }
187
188        Ok(())
189    }
190
191    /// Generates payload attributes for a new block, passes them to FCU and inserts built payload
192    /// through newPayload.
193    async fn advance(&mut self) -> eyre::Result<()> {
194        let timestamp = std::cmp::max(
195            self.last_timestamp + 1,
196            std::time::SystemTime::now()
197                .duration_since(UNIX_EPOCH)
198                .expect("cannot be earlier than UNIX_EPOCH")
199                .as_secs(),
200        );
201
202        let res = self
203            .to_engine
204            .fork_choice_updated(
205                self.forkchoice_state(),
206                Some(self.payload_attributes_builder.build(timestamp)),
207                EngineApiMessageVersion::default(),
208            )
209            .await?;
210
211        if !res.is_valid() {
212            eyre::bail!("Invalid payload status")
213        }
214
215        let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
216
217        let Some(Ok(payload)) =
218            self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
219        else {
220            eyre::bail!("No payload")
221        };
222
223        let block = payload.block();
224
225        let payload = T::block_to_payload(payload.block().clone());
226        let res = self.to_engine.new_payload(payload).await?;
227
228        if !res.is_valid() {
229            eyre::bail!("Invalid payload")
230        }
231
232        self.last_timestamp = timestamp;
233        self.last_block_hashes.push(block.hash());
234        // ensure we keep at most 64 blocks
235        if self.last_block_hashes.len() > 64 {
236            self.last_block_hashes =
237                self.last_block_hashes.split_off(self.last_block_hashes.len() - 64);
238        }
239
240        Ok(())
241    }
242}