reth_engine_local/
miner.rs

1//! Contains the implementation of the mining mode for the local engine.
2
3use alloy_consensus::BlockHeader;
4use alloy_primitives::{TxHash, B256};
5use alloy_rpc_types_engine::ForkchoiceState;
6use eyre::OptionExt;
7use futures_util::{stream::Fuse, StreamExt};
8use reth_engine_primitives::ConsensusEngineHandle;
9use reth_payload_builder::PayloadBuilderHandle;
10use reth_payload_primitives::{
11    BuiltPayload, EngineApiMessageVersion, PayloadAttributesBuilder, PayloadKind, PayloadTypes,
12};
13use reth_provider::BlockReader;
14use reth_transaction_pool::TransactionPool;
15use std::{
16    collections::VecDeque,
17    future::Future,
18    pin::Pin,
19    task::{Context, Poll},
20    time::{Duration, UNIX_EPOCH},
21};
22use tokio::time::Interval;
23use tokio_stream::wrappers::ReceiverStream;
24use tracing::error;
25
26/// A mining mode for the local dev engine.
27#[derive(Debug)]
28pub enum MiningMode<Pool: TransactionPool + Unpin> {
29    /// In this mode a block is built as soon as
30    /// a valid transaction reaches the pool.
31    /// If `max_transactions` is set, a block is built when that many transactions have
32    /// accumulated.
33    Instant {
34        /// The transaction pool.
35        pool: Pool,
36        /// Stream of transaction notifications.
37        rx: Fuse<ReceiverStream<TxHash>>,
38        /// Maximum number of transactions to accumulate before mining a block.
39        /// If None, mine immediately when any transaction arrives.
40        max_transactions: Option<usize>,
41        /// Counter for accumulated transactions (only used when `max_transactions` is set).
42        accumulated: usize,
43    },
44    /// In this mode a block is built at a fixed interval.
45    Interval(Interval),
46}
47
48impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
49    /// Constructor for a [`MiningMode::Instant`]
50    pub fn instant(pool: Pool, max_transactions: Option<usize>) -> Self {
51        let rx = pool.pending_transactions_listener();
52        Self::Instant { pool, rx: ReceiverStream::new(rx).fuse(), max_transactions, accumulated: 0 }
53    }
54
55    /// Constructor for a [`MiningMode::Interval`]
56    pub fn interval(duration: Duration) -> Self {
57        let start = tokio::time::Instant::now() + duration;
58        Self::Interval(tokio::time::interval_at(start, duration))
59    }
60}
61
62impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
63    type Output = ();
64
65    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66        let this = self.get_mut();
67        match this {
68            Self::Instant { pool, rx, max_transactions, accumulated } => {
69                // Poll for new transaction notifications
70                while let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
71                    if pool.pending_and_queued_txn_count().0 == 0 {
72                        continue;
73                    }
74                    if let Some(max_tx) = max_transactions {
75                        *accumulated += 1;
76                        // If we've reached the max transactions threshold, mine a block
77                        if *accumulated >= *max_tx {
78                            *accumulated = 0; // Reset counter for next block
79                            return Poll::Ready(());
80                        }
81                    } else {
82                        // If no max_transactions is set, mine immediately
83                        return Poll::Ready(());
84                    }
85                }
86                Poll::Pending
87            }
88            Self::Interval(interval) => {
89                if interval.poll_tick(cx).is_ready() {
90                    return Poll::Ready(())
91                }
92                Poll::Pending
93            }
94        }
95    }
96}
97
98/// Local miner advancing the chain
99#[derive(Debug)]
100pub struct LocalMiner<T: PayloadTypes, B, Pool: TransactionPool + Unpin> {
101    /// The payload attribute builder for the engine
102    payload_attributes_builder: B,
103    /// Sender for events to engine.
104    to_engine: ConsensusEngineHandle<T>,
105    /// The mining mode for the engine
106    mode: MiningMode<Pool>,
107    /// The payload builder for the engine
108    payload_builder: PayloadBuilderHandle<T>,
109    /// Timestamp for the next block.
110    last_timestamp: u64,
111    /// Stores latest mined blocks.
112    last_block_hashes: VecDeque<B256>,
113}
114
115impl<T, B, Pool> LocalMiner<T, B, Pool>
116where
117    T: PayloadTypes,
118    B: PayloadAttributesBuilder<<T as PayloadTypes>::PayloadAttributes>,
119    Pool: TransactionPool + Unpin,
120{
121    /// Spawns a new [`LocalMiner`] with the given parameters.
122    pub fn new(
123        provider: impl BlockReader,
124        payload_attributes_builder: B,
125        to_engine: ConsensusEngineHandle<T>,
126        mode: MiningMode<Pool>,
127        payload_builder: PayloadBuilderHandle<T>,
128    ) -> Self {
129        let latest_header =
130            provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
131
132        Self {
133            payload_attributes_builder,
134            to_engine,
135            mode,
136            payload_builder,
137            last_timestamp: latest_header.timestamp(),
138            last_block_hashes: VecDeque::from([latest_header.hash()]),
139        }
140    }
141
142    /// Runs the [`LocalMiner`] in a loop, polling the miner and building payloads.
143    pub async fn run(mut self) {
144        let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
145        loop {
146            tokio::select! {
147                // Wait for the interval or the pool to receive a transaction
148                _ = &mut self.mode => {
149                    if let Err(e) = self.advance().await {
150                        error!(target: "engine::local", "Error advancing the chain: {:?}", e);
151                    }
152                }
153                // send FCU once in a while
154                _ = fcu_interval.tick() => {
155                    if let Err(e) = self.update_forkchoice_state().await {
156                        error!(target: "engine::local", "Error updating fork choice: {:?}", e);
157                    }
158                }
159            }
160        }
161    }
162
163    /// Returns current forkchoice state.
164    fn forkchoice_state(&self) -> ForkchoiceState {
165        ForkchoiceState {
166            head_block_hash: *self.last_block_hashes.back().expect("at least 1 block exists"),
167            safe_block_hash: *self
168                .last_block_hashes
169                .get(self.last_block_hashes.len().saturating_sub(32))
170                .expect("at least 1 block exists"),
171            finalized_block_hash: *self
172                .last_block_hashes
173                .get(self.last_block_hashes.len().saturating_sub(64))
174                .expect("at least 1 block exists"),
175        }
176    }
177
178    /// Sends a FCU to the engine.
179    async fn update_forkchoice_state(&self) -> eyre::Result<()> {
180        let state = self.forkchoice_state();
181        let res = self
182            .to_engine
183            .fork_choice_updated(state, None, EngineApiMessageVersion::default())
184            .await?;
185
186        if !res.is_valid() {
187            eyre::bail!("Invalid fork choice update {state:?}: {res:?}")
188        }
189
190        Ok(())
191    }
192
193    /// Generates payload attributes for a new block, passes them to FCU and inserts built payload
194    /// through newPayload.
195    async fn advance(&mut self) -> eyre::Result<()> {
196        let timestamp = std::cmp::max(
197            self.last_timestamp.saturating_add(1),
198            std::time::SystemTime::now()
199                .duration_since(UNIX_EPOCH)
200                .expect("cannot be earlier than UNIX_EPOCH")
201                .as_secs(),
202        );
203
204        let res = self
205            .to_engine
206            .fork_choice_updated(
207                self.forkchoice_state(),
208                Some(self.payload_attributes_builder.build(timestamp)),
209                EngineApiMessageVersion::default(),
210            )
211            .await?;
212
213        if !res.is_valid() {
214            eyre::bail!("Invalid payload status")
215        }
216
217        let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
218
219        let Some(Ok(payload)) =
220            self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
221        else {
222            eyre::bail!("No payload")
223        };
224
225        let block = payload.block();
226
227        let payload = T::block_to_payload(payload.block().clone());
228        let res = self.to_engine.new_payload(payload).await?;
229
230        if !res.is_valid() {
231            eyre::bail!("Invalid payload")
232        }
233
234        self.last_timestamp = timestamp;
235        self.last_block_hashes.push_back(block.hash());
236        // ensure we keep at most 64 blocks
237        if self.last_block_hashes.len() > 64 {
238            self.last_block_hashes.pop_front();
239        }
240
241        Ok(())
242    }
243}