Skip to main content

reth_engine_local/
miner.rs

1//! Contains the implementation of the mining mode for the local engine.
2
3use alloy_primitives::{TxHash, B256};
4use alloy_rpc_types_engine::ForkchoiceState;
5use eyre::OptionExt;
6use futures_util::{stream::Fuse, Stream, StreamExt};
7use reth_engine_primitives::ConsensusEngineHandle;
8use reth_payload_builder::PayloadBuilderHandle;
9use reth_payload_primitives::{
10    BuiltPayload, EngineApiMessageVersion, PayloadAttributesBuilder, PayloadKind, PayloadTypes,
11};
12use reth_primitives_traits::{HeaderTy, SealedHeaderFor};
13use reth_storage_api::BlockReader;
14use reth_transaction_pool::TransactionPool;
15use std::{
16    collections::VecDeque,
17    fmt,
18    future::Future,
19    pin::Pin,
20    task::{Context, Poll},
21    time::Duration,
22};
23use tokio::time::Interval;
24use tokio_stream::wrappers::ReceiverStream;
25use tracing::error;
26
27/// A mining mode for the local dev engine.
28pub enum MiningMode<Pool: TransactionPool + Unpin> {
29    /// In this mode a block is built as soon as
30    /// a valid transaction reaches the pool.
31    /// If `max_transactions` is set, a block is built when that many transactions have
32    /// accumulated.
33    Instant {
34        /// The transaction pool.
35        pool: Pool,
36        /// Stream of transaction notifications.
37        rx: Fuse<ReceiverStream<TxHash>>,
38        /// Maximum number of transactions to accumulate before mining a block.
39        /// If None, mine immediately when any transaction arrives.
40        max_transactions: Option<usize>,
41        /// Counter for accumulated transactions (only used when `max_transactions` is set).
42        accumulated: usize,
43    },
44    /// In this mode a block is built at a fixed interval.
45    Interval(Interval),
46    /// In this mode a block is built when the trigger stream yields a value.
47    ///
48    /// This is a general-purpose trigger that can be fired on demand, for example via a channel
49    /// or any other [`Stream`] implementation.
50    Trigger(Pin<Box<dyn Stream<Item = ()> + Send + Sync>>),
51}
52
53impl<Pool: TransactionPool + Unpin> fmt::Debug for MiningMode<Pool> {
54    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55        match self {
56            Self::Instant { max_transactions, accumulated, .. } => f
57                .debug_struct("Instant")
58                .field("max_transactions", max_transactions)
59                .field("accumulated", accumulated)
60                .finish(),
61            Self::Interval(interval) => f.debug_tuple("Interval").field(interval).finish(),
62            Self::Trigger(_) => f.debug_tuple("Trigger").finish(),
63        }
64    }
65}
66
67impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
68    /// Constructor for a [`MiningMode::Instant`]
69    pub fn instant(pool: Pool, max_transactions: Option<usize>) -> Self {
70        let rx = pool.pending_transactions_listener();
71        Self::Instant { pool, rx: ReceiverStream::new(rx).fuse(), max_transactions, accumulated: 0 }
72    }
73
74    /// Constructor for a [`MiningMode::Interval`]
75    pub fn interval(duration: Duration) -> Self {
76        let start = tokio::time::Instant::now() + duration;
77        Self::Interval(tokio::time::interval_at(start, duration))
78    }
79
80    /// Constructor for a [`MiningMode::Trigger`]
81    ///
82    /// Accepts any stream that yields `()` values, each of which triggers a new block to be
83    /// mined. This can be backed by a channel, a custom stream, or any other async source.
84    pub fn trigger(trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
85        Self::Trigger(Box::pin(trigger))
86    }
87}
88
89impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
90    type Output = ();
91
92    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
93        let this = self.get_mut();
94        match this {
95            Self::Instant { pool, rx, max_transactions, accumulated } => {
96                // Poll for new transaction notifications
97                while let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
98                    if pool.pending_and_queued_txn_count().0 == 0 {
99                        continue;
100                    }
101                    if let Some(max_tx) = max_transactions {
102                        *accumulated += 1;
103                        // If we've reached the max transactions threshold, mine a block
104                        if *accumulated >= *max_tx {
105                            *accumulated = 0; // Reset counter for next block
106                            return Poll::Ready(());
107                        }
108                    } else {
109                        // If no max_transactions is set, mine immediately
110                        return Poll::Ready(());
111                    }
112                }
113                Poll::Pending
114            }
115            Self::Interval(interval) => {
116                if interval.poll_tick(cx).is_ready() {
117                    return Poll::Ready(())
118                }
119                Poll::Pending
120            }
121            Self::Trigger(trigger) => {
122                if trigger.poll_next_unpin(cx).is_ready() {
123                    return Poll::Ready(())
124                }
125                Poll::Pending
126            }
127        }
128    }
129}
130
131/// Local miner advancing the chain
132#[derive(Debug)]
133pub struct LocalMiner<T: PayloadTypes, B, Pool: TransactionPool + Unpin> {
134    /// The payload attribute builder for the engine
135    payload_attributes_builder: B,
136    /// Sender for events to engine.
137    to_engine: ConsensusEngineHandle<T>,
138    /// The mining mode for the engine
139    mode: MiningMode<Pool>,
140    /// The payload builder for the engine
141    payload_builder: PayloadBuilderHandle<T>,
142    /// Latest block in the chain so far.
143    last_header: SealedHeaderFor<<T::BuiltPayload as BuiltPayload>::Primitives>,
144    /// Stores latest mined blocks.
145    last_block_hashes: VecDeque<B256>,
146}
147
148impl<T, B, Pool> LocalMiner<T, B, Pool>
149where
150    T: PayloadTypes,
151    B: PayloadAttributesBuilder<
152        T::PayloadAttributes,
153        HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>,
154    >,
155    Pool: TransactionPool + Unpin,
156{
157    /// Spawns a new [`LocalMiner`] with the given parameters.
158    pub fn new(
159        provider: impl BlockReader<Header = HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>>,
160        payload_attributes_builder: B,
161        to_engine: ConsensusEngineHandle<T>,
162        mode: MiningMode<Pool>,
163        payload_builder: PayloadBuilderHandle<T>,
164    ) -> Self {
165        let last_header =
166            provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
167
168        Self {
169            payload_attributes_builder,
170            to_engine,
171            mode,
172            payload_builder,
173            last_block_hashes: VecDeque::from([last_header.hash()]),
174            last_header,
175        }
176    }
177
178    /// Runs the [`LocalMiner`] in a loop, polling the miner and building payloads.
179    pub async fn run(mut self) {
180        let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
181        loop {
182            tokio::select! {
183                // Wait for the interval or the pool to receive a transaction
184                _ = &mut self.mode => {
185                    if let Err(e) = self.advance().await {
186                        error!(target: "engine::local", "Error advancing the chain: {:?}", e);
187                    }
188                }
189                // send FCU once in a while
190                _ = fcu_interval.tick() => {
191                    if let Err(e) = self.update_forkchoice_state().await {
192                        error!(target: "engine::local", "Error updating fork choice: {:?}", e);
193                    }
194                }
195            }
196        }
197    }
198
199    /// Returns current forkchoice state.
200    fn forkchoice_state(&self) -> ForkchoiceState {
201        ForkchoiceState {
202            head_block_hash: *self.last_block_hashes.back().expect("at least 1 block exists"),
203            safe_block_hash: *self
204                .last_block_hashes
205                .get(self.last_block_hashes.len().saturating_sub(32))
206                .expect("at least 1 block exists"),
207            finalized_block_hash: *self
208                .last_block_hashes
209                .get(self.last_block_hashes.len().saturating_sub(64))
210                .expect("at least 1 block exists"),
211        }
212    }
213
214    /// Sends a FCU to the engine.
215    async fn update_forkchoice_state(&self) -> eyre::Result<()> {
216        let state = self.forkchoice_state();
217        let res = self
218            .to_engine
219            .fork_choice_updated(state, None, EngineApiMessageVersion::default())
220            .await?;
221
222        if !res.is_valid() {
223            eyre::bail!("Invalid fork choice update {state:?}: {res:?}")
224        }
225
226        Ok(())
227    }
228
229    /// Generates payload attributes for a new block, passes them to FCU and inserts built payload
230    /// through newPayload.
231    async fn advance(&mut self) -> eyre::Result<()> {
232        let res = self
233            .to_engine
234            .fork_choice_updated(
235                self.forkchoice_state(),
236                Some(self.payload_attributes_builder.build(&self.last_header)),
237                EngineApiMessageVersion::default(),
238            )
239            .await?;
240
241        if !res.is_valid() {
242            eyre::bail!("Invalid payload status")
243        }
244
245        let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
246
247        let Some(Ok(payload)) =
248            self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
249        else {
250            eyre::bail!("No payload")
251        };
252
253        let header = payload.block().sealed_header().clone();
254        let payload = T::block_to_payload(payload.block().clone());
255        let res = self.to_engine.new_payload(payload).await?;
256
257        if !res.is_valid() {
258            eyre::bail!("Invalid payload")
259        }
260
261        self.last_block_hashes.push_back(header.hash());
262        self.last_header = header;
263        // ensure we keep at most 64 blocks
264        if self.last_block_hashes.len() > 64 {
265            self.last_block_hashes.pop_front();
266        }
267
268        Ok(())
269    }
270}