Skip to main content

reth_engine_local/
miner.rs

1//! Contains the implementation of the mining mode for the local engine.
2
3use alloy_primitives::{TxHash, B256};
4use alloy_rpc_types_engine::ForkchoiceState;
5use eyre::OptionExt;
6use futures_util::{stream::Fuse, Stream, StreamExt};
7use reth_engine_primitives::ConsensusEngineHandle;
8use reth_payload_builder::PayloadBuilderHandle;
9use reth_payload_primitives::{BuiltPayload, PayloadAttributesBuilder, PayloadKind, PayloadTypes};
10use reth_primitives_traits::{HeaderTy, SealedHeaderFor};
11use reth_storage_api::BlockReader;
12use reth_transaction_pool::TransactionPool;
13use std::{
14    collections::VecDeque,
15    fmt,
16    future::Future,
17    pin::Pin,
18    task::{Context, Poll},
19    time::Duration,
20};
21use tokio::time::Interval;
22use tokio_stream::wrappers::ReceiverStream;
23use tracing::error;
24
25/// A mining mode for the local dev engine.
26pub enum MiningMode<Pool: TransactionPool + Unpin> {
27    /// In this mode a block is built as soon as
28    /// a valid transaction reaches the pool.
29    /// If `max_transactions` is set, a block is built when that many transactions have
30    /// accumulated.
31    Instant {
32        /// The transaction pool.
33        pool: Pool,
34        /// Stream of transaction notifications.
35        rx: Fuse<ReceiverStream<TxHash>>,
36        /// Maximum number of transactions to accumulate before mining a block.
37        /// If None, mine immediately when any transaction arrives.
38        max_transactions: Option<usize>,
39        /// Counter for accumulated transactions (only used when `max_transactions` is set).
40        accumulated: usize,
41    },
42    /// In this mode a block is built at a fixed interval.
43    Interval(Interval),
44    /// In this mode a block is built when the trigger stream yields a value.
45    ///
46    /// This is a general-purpose trigger that can be fired on demand, for example via a channel
47    /// or any other [`Stream`] implementation.
48    Trigger(Pin<Box<dyn Stream<Item = ()> + Send + Sync>>),
49}
50
51impl<Pool: TransactionPool + Unpin> fmt::Debug for MiningMode<Pool> {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        match self {
54            Self::Instant { max_transactions, accumulated, .. } => f
55                .debug_struct("Instant")
56                .field("max_transactions", max_transactions)
57                .field("accumulated", accumulated)
58                .finish(),
59            Self::Interval(interval) => f.debug_tuple("Interval").field(interval).finish(),
60            Self::Trigger(_) => f.debug_tuple("Trigger").finish(),
61        }
62    }
63}
64
65impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
66    /// Constructor for a [`MiningMode::Instant`]
67    pub fn instant(pool: Pool, max_transactions: Option<usize>) -> Self {
68        let rx = pool.pending_transactions_listener();
69        Self::Instant { pool, rx: ReceiverStream::new(rx).fuse(), max_transactions, accumulated: 0 }
70    }
71
72    /// Constructor for a [`MiningMode::Interval`]
73    pub fn interval(duration: Duration) -> Self {
74        let start = tokio::time::Instant::now() + duration;
75        Self::Interval(tokio::time::interval_at(start, duration))
76    }
77
78    /// Constructor for a [`MiningMode::Trigger`]
79    ///
80    /// Accepts any stream that yields `()` values, each of which triggers a new block to be
81    /// mined. This can be backed by a channel, a custom stream, or any other async source.
82    pub fn trigger(trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
83        Self::Trigger(Box::pin(trigger))
84    }
85}
86
87impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
88    type Output = ();
89
90    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
91        let this = self.get_mut();
92        match this {
93            Self::Instant { pool, rx, max_transactions, accumulated } => {
94                // Poll for new transaction notifications
95                while let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
96                    if pool.pending_and_queued_txn_count().0 == 0 {
97                        continue;
98                    }
99                    if let Some(max_tx) = max_transactions {
100                        *accumulated += 1;
101                        // If we've reached the max transactions threshold, mine a block
102                        if *accumulated >= *max_tx {
103                            *accumulated = 0; // Reset counter for next block
104                            return Poll::Ready(());
105                        }
106                    } else {
107                        // If no max_transactions is set, mine immediately
108                        return Poll::Ready(());
109                    }
110                }
111                Poll::Pending
112            }
113            Self::Interval(interval) => {
114                if interval.poll_tick(cx).is_ready() {
115                    return Poll::Ready(())
116                }
117                Poll::Pending
118            }
119            Self::Trigger(trigger) => {
120                if trigger.poll_next_unpin(cx).is_ready() {
121                    return Poll::Ready(())
122                }
123                Poll::Pending
124            }
125        }
126    }
127}
128
129/// Local miner advancing the chain
130#[derive(Debug)]
131pub struct LocalMiner<T: PayloadTypes, B, Pool: TransactionPool + Unpin> {
132    /// The payload attribute builder for the engine
133    payload_attributes_builder: B,
134    /// Sender for events to engine.
135    to_engine: ConsensusEngineHandle<T>,
136    /// The mining mode for the engine
137    mode: MiningMode<Pool>,
138    /// The payload builder for the engine
139    payload_builder: PayloadBuilderHandle<T>,
140    /// Latest block in the chain so far.
141    last_header: SealedHeaderFor<<T::BuiltPayload as BuiltPayload>::Primitives>,
142    /// Stores latest mined blocks.
143    last_block_hashes: VecDeque<B256>,
144    /// Optional sleep duration between initiating payload building and resolving.
145    ///
146    /// When set, the miner sleeps after `fork_choice_updated` before calling
147    /// `resolve_kind`, giving the payload job time for multiple rebuild attempts.
148    payload_wait_time: Option<Duration>,
149}
150
151impl<T, B, Pool> LocalMiner<T, B, Pool>
152where
153    T: PayloadTypes,
154    B: PayloadAttributesBuilder<
155        T::PayloadAttributes,
156        HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>,
157    >,
158    Pool: TransactionPool + Unpin,
159{
160    /// Spawns a new [`LocalMiner`] with the given parameters.
161    pub fn new(
162        provider: impl BlockReader<Header = HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>>,
163        payload_attributes_builder: B,
164        to_engine: ConsensusEngineHandle<T>,
165        mode: MiningMode<Pool>,
166        payload_builder: PayloadBuilderHandle<T>,
167    ) -> Self {
168        let last_header =
169            provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
170
171        Self {
172            payload_attributes_builder,
173            to_engine,
174            mode,
175            payload_builder,
176            last_block_hashes: VecDeque::from([last_header.hash()]),
177            last_header,
178            payload_wait_time: None,
179        }
180    }
181
182    /// Sets the payload wait time, if any.
183    pub const fn with_payload_wait_time_opt(mut self, wait_time: Option<Duration>) -> Self {
184        self.payload_wait_time = wait_time;
185        self
186    }
187
188    /// Runs the [`LocalMiner`] in a loop, polling the miner and building payloads.
189    pub async fn run(mut self) {
190        let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
191        loop {
192            tokio::select! {
193                // Wait for the interval or the pool to receive a transaction
194                _ = &mut self.mode => {
195                    if let Err(e) = self.advance().await {
196                        error!(target: "engine::local", "Error advancing the chain: {:?}", e);
197                    }
198                }
199                // send FCU once in a while
200                _ = fcu_interval.tick() => {
201                    if let Err(e) = self.update_forkchoice_state().await {
202                        error!(target: "engine::local", "Error updating fork choice: {:?}", e);
203                    }
204                }
205            }
206        }
207    }
208
209    /// Returns current forkchoice state.
210    fn forkchoice_state(&self) -> ForkchoiceState {
211        ForkchoiceState {
212            head_block_hash: *self.last_block_hashes.back().expect("at least 1 block exists"),
213            safe_block_hash: *self
214                .last_block_hashes
215                .get(self.last_block_hashes.len().saturating_sub(32))
216                .expect("at least 1 block exists"),
217            finalized_block_hash: *self
218                .last_block_hashes
219                .get(self.last_block_hashes.len().saturating_sub(64))
220                .expect("at least 1 block exists"),
221        }
222    }
223
224    /// Sends a FCU to the engine.
225    async fn update_forkchoice_state(&self) -> eyre::Result<()> {
226        let state = self.forkchoice_state();
227        let res = self.to_engine.fork_choice_updated(state, None).await?;
228
229        if !res.is_valid() {
230            eyre::bail!("Invalid fork choice update {state:?}: {res:?}")
231        }
232
233        Ok(())
234    }
235
236    /// Generates payload attributes for a new block, passes them to FCU and inserts built payload
237    /// through newPayload.
238    async fn advance(&mut self) -> eyre::Result<()> {
239        let res = self
240            .to_engine
241            .fork_choice_updated(
242                self.forkchoice_state(),
243                Some(self.payload_attributes_builder.build(&self.last_header)),
244            )
245            .await?;
246
247        if !res.is_valid() {
248            eyre::bail!("Invalid payload status")
249        }
250
251        let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
252
253        if let Some(wait_time) = self.payload_wait_time {
254            tokio::time::sleep(wait_time).await;
255        }
256
257        let Some(Ok(payload)) =
258            self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
259        else {
260            eyre::bail!("No payload")
261        };
262
263        let header = payload.block().sealed_header().clone();
264        let res = self.to_engine.new_payload(payload.into()).await?;
265
266        if !res.is_valid() {
267            eyre::bail!("Invalid payload")
268        }
269
270        self.last_block_hashes.push_back(header.hash());
271        self.last_header = header;
272        // ensure we keep at most 64 blocks
273        if self.last_block_hashes.len() > 64 {
274            self.last_block_hashes.pop_front();
275        }
276
277        Ok(())
278    }
279}