Skip to main content

reth_engine_local/
miner.rs

1//! Contains the implementation of the mining mode for the local engine.
2
3use alloy_primitives::{TxHash, B256};
4use alloy_rpc_types_engine::ForkchoiceState;
5use eyre::OptionExt;
6use futures_util::{stream::Fuse, Stream, StreamExt};
7use reth_engine_primitives::ConsensusEngineHandle;
8use reth_payload_builder::PayloadBuilderHandle;
9use reth_payload_primitives::{BuiltPayload, PayloadAttributesBuilder, PayloadKind, PayloadTypes};
10use reth_primitives_traits::{HeaderTy, SealedHeaderFor};
11use reth_storage_api::BlockReader;
12use reth_transaction_pool::TransactionPool;
13use std::{
14    collections::VecDeque,
15    fmt,
16    future::Future,
17    pin::Pin,
18    task::{Context, Poll},
19    time::Duration,
20};
21use tokio::time::Interval;
22use tokio_stream::wrappers::ReceiverStream;
23use tracing::error;
24
25/// A mining mode for the local dev engine.
26pub enum MiningMode<Pool: TransactionPool + Unpin> {
27    /// In this mode a block is built as soon as
28    /// a valid transaction reaches the pool.
29    /// If `max_transactions` is set, a block is built when that many transactions have
30    /// accumulated.
31    Instant {
32        /// The transaction pool.
33        pool: Pool,
34        /// Stream of transaction notifications.
35        rx: Fuse<ReceiverStream<TxHash>>,
36        /// Maximum number of transactions to accumulate before mining a block.
37        /// If None, mine immediately when any transaction arrives.
38        max_transactions: Option<usize>,
39        /// Counter for accumulated transactions (only used when `max_transactions` is set).
40        accumulated: usize,
41    },
42    /// In this mode a block is built at a fixed interval.
43    Interval(Interval),
44    /// In this mode a block is built when the trigger stream yields a value.
45    ///
46    /// This is a general-purpose trigger that can be fired on demand, for example via a channel
47    /// or any other [`Stream`] implementation.
48    Trigger(Pin<Box<dyn Stream<Item = ()> + Send + Sync>>),
49}
50
51impl<Pool: TransactionPool + Unpin> fmt::Debug for MiningMode<Pool> {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        match self {
54            Self::Instant { max_transactions, accumulated, .. } => f
55                .debug_struct("Instant")
56                .field("max_transactions", max_transactions)
57                .field("accumulated", accumulated)
58                .finish(),
59            Self::Interval(interval) => f.debug_tuple("Interval").field(interval).finish(),
60            Self::Trigger(_) => f.debug_tuple("Trigger").finish(),
61        }
62    }
63}
64
65impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
66    /// Constructor for a [`MiningMode::Instant`]
67    pub fn instant(pool: Pool, max_transactions: Option<usize>) -> Self {
68        let rx = pool.pending_transactions_listener();
69        Self::Instant { pool, rx: ReceiverStream::new(rx).fuse(), max_transactions, accumulated: 0 }
70    }
71
72    /// Constructor for a [`MiningMode::Interval`]
73    pub fn interval(duration: Duration) -> Self {
74        let start = tokio::time::Instant::now() + duration;
75        Self::Interval(tokio::time::interval_at(start, duration))
76    }
77
78    /// Constructor for a [`MiningMode::Trigger`]
79    ///
80    /// Accepts any stream that yields `()` values, each of which triggers a new block to be
81    /// mined. This can be backed by a channel, a custom stream, or any other async source.
82    pub fn trigger(trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
83        Self::Trigger(Box::pin(trigger))
84    }
85}
86
87impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
88    type Output = ();
89
90    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
91        let this = self.get_mut();
92        match this {
93            Self::Instant { pool, rx, max_transactions, accumulated } => {
94                // Poll for new transaction notifications
95                while let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
96                    if pool.pending_and_queued_txn_count().0 == 0 {
97                        continue;
98                    }
99                    if let Some(max_tx) = max_transactions {
100                        *accumulated += 1;
101                        // If we've reached the max transactions threshold, mine a block
102                        if *accumulated >= *max_tx {
103                            *accumulated = 0; // Reset counter for next block
104                            return Poll::Ready(());
105                        }
106                    } else {
107                        // If no max_transactions is set, mine immediately
108                        return Poll::Ready(());
109                    }
110                }
111                Poll::Pending
112            }
113            Self::Interval(interval) => {
114                if interval.poll_tick(cx).is_ready() {
115                    return Poll::Ready(())
116                }
117                Poll::Pending
118            }
119            Self::Trigger(trigger) => {
120                if trigger.poll_next_unpin(cx).is_ready() {
121                    return Poll::Ready(())
122                }
123                Poll::Pending
124            }
125        }
126    }
127}
128
129/// Local miner advancing the chain
130#[derive(Debug)]
131pub struct LocalMiner<T: PayloadTypes, B, Pool: TransactionPool + Unpin> {
132    /// The payload attribute builder for the engine
133    payload_attributes_builder: B,
134    /// Sender for events to engine.
135    to_engine: ConsensusEngineHandle<T>,
136    /// The mining mode for the engine
137    mode: MiningMode<Pool>,
138    /// The payload builder for the engine
139    payload_builder: PayloadBuilderHandle<T>,
140    /// Latest block in the chain so far.
141    last_header: SealedHeaderFor<<T::BuiltPayload as BuiltPayload>::Primitives>,
142    /// Stores latest mined blocks.
143    last_block_hashes: VecDeque<B256>,
144}
145
146impl<T, B, Pool> LocalMiner<T, B, Pool>
147where
148    T: PayloadTypes,
149    B: PayloadAttributesBuilder<
150        T::PayloadAttributes,
151        HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>,
152    >,
153    Pool: TransactionPool + Unpin,
154{
155    /// Spawns a new [`LocalMiner`] with the given parameters.
156    pub fn new(
157        provider: impl BlockReader<Header = HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>>,
158        payload_attributes_builder: B,
159        to_engine: ConsensusEngineHandle<T>,
160        mode: MiningMode<Pool>,
161        payload_builder: PayloadBuilderHandle<T>,
162    ) -> Self {
163        let last_header =
164            provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
165
166        Self {
167            payload_attributes_builder,
168            to_engine,
169            mode,
170            payload_builder,
171            last_block_hashes: VecDeque::from([last_header.hash()]),
172            last_header,
173        }
174    }
175
176    /// Runs the [`LocalMiner`] in a loop, polling the miner and building payloads.
177    pub async fn run(mut self) {
178        let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
179        loop {
180            tokio::select! {
181                // Wait for the interval or the pool to receive a transaction
182                _ = &mut self.mode => {
183                    if let Err(e) = self.advance().await {
184                        error!(target: "engine::local", "Error advancing the chain: {:?}", e);
185                    }
186                }
187                // send FCU once in a while
188                _ = fcu_interval.tick() => {
189                    if let Err(e) = self.update_forkchoice_state().await {
190                        error!(target: "engine::local", "Error updating fork choice: {:?}", e);
191                    }
192                }
193            }
194        }
195    }
196
197    /// Returns current forkchoice state.
198    fn forkchoice_state(&self) -> ForkchoiceState {
199        ForkchoiceState {
200            head_block_hash: *self.last_block_hashes.back().expect("at least 1 block exists"),
201            safe_block_hash: *self
202                .last_block_hashes
203                .get(self.last_block_hashes.len().saturating_sub(32))
204                .expect("at least 1 block exists"),
205            finalized_block_hash: *self
206                .last_block_hashes
207                .get(self.last_block_hashes.len().saturating_sub(64))
208                .expect("at least 1 block exists"),
209        }
210    }
211
212    /// Sends a FCU to the engine.
213    async fn update_forkchoice_state(&self) -> eyre::Result<()> {
214        let state = self.forkchoice_state();
215        let res = self.to_engine.fork_choice_updated(state, None).await?;
216
217        if !res.is_valid() {
218            eyre::bail!("Invalid fork choice update {state:?}: {res:?}")
219        }
220
221        Ok(())
222    }
223
224    /// Generates payload attributes for a new block, passes them to FCU and inserts built payload
225    /// through newPayload.
226    async fn advance(&mut self) -> eyre::Result<()> {
227        let res = self
228            .to_engine
229            .fork_choice_updated(
230                self.forkchoice_state(),
231                Some(self.payload_attributes_builder.build(&self.last_header)),
232            )
233            .await?;
234
235        if !res.is_valid() {
236            eyre::bail!("Invalid payload status")
237        }
238
239        let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
240
241        let Some(Ok(payload)) =
242            self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
243        else {
244            eyre::bail!("No payload")
245        };
246
247        let header = payload.block().sealed_header().clone();
248        let payload = T::block_to_payload(payload.block().clone());
249        let res = self.to_engine.new_payload(payload).await?;
250
251        if !res.is_valid() {
252            eyre::bail!("Invalid payload")
253        }
254
255        self.last_block_hashes.push_back(header.hash());
256        self.last_header = header;
257        // ensure we keep at most 64 blocks
258        if self.last_block_hashes.len() > 64 {
259            self.last_block_hashes.pop_front();
260        }
261
262        Ok(())
263    }
264}