reth_engine_local/
miner.rs

1//! Contains the implementation of the mining mode for the local engine.
2
3use alloy_primitives::{TxHash, B256};
4use alloy_rpc_types_engine::ForkchoiceState;
5use eyre::OptionExt;
6use futures_util::{stream::Fuse, StreamExt};
7use reth_engine_primitives::ConsensusEngineHandle;
8use reth_payload_builder::PayloadBuilderHandle;
9use reth_payload_primitives::{
10    BuiltPayload, EngineApiMessageVersion, PayloadAttributesBuilder, PayloadKind, PayloadTypes,
11};
12use reth_primitives_traits::{HeaderTy, SealedHeaderFor};
13use reth_storage_api::BlockReader;
14use reth_transaction_pool::TransactionPool;
15use std::{
16    collections::VecDeque,
17    future::Future,
18    pin::Pin,
19    task::{Context, Poll},
20    time::Duration,
21};
22use tokio::time::Interval;
23use tokio_stream::wrappers::ReceiverStream;
24use tracing::error;
25
26/// A mining mode for the local dev engine.
27#[derive(Debug)]
28pub enum MiningMode<Pool: TransactionPool + Unpin> {
29    /// In this mode a block is built as soon as
30    /// a valid transaction reaches the pool.
31    /// If `max_transactions` is set, a block is built when that many transactions have
32    /// accumulated.
33    Instant {
34        /// The transaction pool.
35        pool: Pool,
36        /// Stream of transaction notifications.
37        rx: Fuse<ReceiverStream<TxHash>>,
38        /// Maximum number of transactions to accumulate before mining a block.
39        /// If None, mine immediately when any transaction arrives.
40        max_transactions: Option<usize>,
41        /// Counter for accumulated transactions (only used when `max_transactions` is set).
42        accumulated: usize,
43    },
44    /// In this mode a block is built at a fixed interval.
45    Interval(Interval),
46}
47
48impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
49    /// Constructor for a [`MiningMode::Instant`]
50    pub fn instant(pool: Pool, max_transactions: Option<usize>) -> Self {
51        let rx = pool.pending_transactions_listener();
52        Self::Instant { pool, rx: ReceiverStream::new(rx).fuse(), max_transactions, accumulated: 0 }
53    }
54
55    /// Constructor for a [`MiningMode::Interval`]
56    pub fn interval(duration: Duration) -> Self {
57        let start = tokio::time::Instant::now() + duration;
58        Self::Interval(tokio::time::interval_at(start, duration))
59    }
60}
61
62impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
63    type Output = ();
64
65    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66        let this = self.get_mut();
67        match this {
68            Self::Instant { pool, rx, max_transactions, accumulated } => {
69                // Poll for new transaction notifications
70                while let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
71                    if pool.pending_and_queued_txn_count().0 == 0 {
72                        continue;
73                    }
74                    if let Some(max_tx) = max_transactions {
75                        *accumulated += 1;
76                        // If we've reached the max transactions threshold, mine a block
77                        if *accumulated >= *max_tx {
78                            *accumulated = 0; // Reset counter for next block
79                            return Poll::Ready(());
80                        }
81                    } else {
82                        // If no max_transactions is set, mine immediately
83                        return Poll::Ready(());
84                    }
85                }
86                Poll::Pending
87            }
88            Self::Interval(interval) => {
89                if interval.poll_tick(cx).is_ready() {
90                    return Poll::Ready(())
91                }
92                Poll::Pending
93            }
94        }
95    }
96}
97
98/// Local miner advancing the chain
99#[derive(Debug)]
100pub struct LocalMiner<T: PayloadTypes, B, Pool: TransactionPool + Unpin> {
101    /// The payload attribute builder for the engine
102    payload_attributes_builder: B,
103    /// Sender for events to engine.
104    to_engine: ConsensusEngineHandle<T>,
105    /// The mining mode for the engine
106    mode: MiningMode<Pool>,
107    /// The payload builder for the engine
108    payload_builder: PayloadBuilderHandle<T>,
109    /// Latest block in the chain so far.
110    last_header: SealedHeaderFor<<T::BuiltPayload as BuiltPayload>::Primitives>,
111    /// Stores latest mined blocks.
112    last_block_hashes: VecDeque<B256>,
113}
114
115impl<T, B, Pool> LocalMiner<T, B, Pool>
116where
117    T: PayloadTypes,
118    B: PayloadAttributesBuilder<
119        T::PayloadAttributes,
120        HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>,
121    >,
122    Pool: TransactionPool + Unpin,
123{
124    /// Spawns a new [`LocalMiner`] with the given parameters.
125    pub fn new(
126        provider: impl BlockReader<Header = HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>>,
127        payload_attributes_builder: B,
128        to_engine: ConsensusEngineHandle<T>,
129        mode: MiningMode<Pool>,
130        payload_builder: PayloadBuilderHandle<T>,
131    ) -> Self {
132        let last_header =
133            provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
134
135        Self {
136            payload_attributes_builder,
137            to_engine,
138            mode,
139            payload_builder,
140            last_block_hashes: VecDeque::from([last_header.hash()]),
141            last_header,
142        }
143    }
144
145    /// Runs the [`LocalMiner`] in a loop, polling the miner and building payloads.
146    pub async fn run(mut self) {
147        let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
148        loop {
149            tokio::select! {
150                // Wait for the interval or the pool to receive a transaction
151                _ = &mut self.mode => {
152                    if let Err(e) = self.advance().await {
153                        error!(target: "engine::local", "Error advancing the chain: {:?}", e);
154                    }
155                }
156                // send FCU once in a while
157                _ = fcu_interval.tick() => {
158                    if let Err(e) = self.update_forkchoice_state().await {
159                        error!(target: "engine::local", "Error updating fork choice: {:?}", e);
160                    }
161                }
162            }
163        }
164    }
165
166    /// Returns current forkchoice state.
167    fn forkchoice_state(&self) -> ForkchoiceState {
168        ForkchoiceState {
169            head_block_hash: *self.last_block_hashes.back().expect("at least 1 block exists"),
170            safe_block_hash: *self
171                .last_block_hashes
172                .get(self.last_block_hashes.len().saturating_sub(32))
173                .expect("at least 1 block exists"),
174            finalized_block_hash: *self
175                .last_block_hashes
176                .get(self.last_block_hashes.len().saturating_sub(64))
177                .expect("at least 1 block exists"),
178        }
179    }
180
181    /// Sends a FCU to the engine.
182    async fn update_forkchoice_state(&self) -> eyre::Result<()> {
183        let state = self.forkchoice_state();
184        let res = self
185            .to_engine
186            .fork_choice_updated(state, None, EngineApiMessageVersion::default())
187            .await?;
188
189        if !res.is_valid() {
190            eyre::bail!("Invalid fork choice update {state:?}: {res:?}")
191        }
192
193        Ok(())
194    }
195
196    /// Generates payload attributes for a new block, passes them to FCU and inserts built payload
197    /// through newPayload.
198    async fn advance(&mut self) -> eyre::Result<()> {
199        let res = self
200            .to_engine
201            .fork_choice_updated(
202                self.forkchoice_state(),
203                Some(self.payload_attributes_builder.build(&self.last_header)),
204                EngineApiMessageVersion::default(),
205            )
206            .await?;
207
208        if !res.is_valid() {
209            eyre::bail!("Invalid payload status")
210        }
211
212        let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
213
214        let Some(Ok(payload)) =
215            self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
216        else {
217            eyre::bail!("No payload")
218        };
219
220        let header = payload.block().sealed_header().clone();
221        let payload = T::block_to_payload(payload.block().clone());
222        let res = self.to_engine.new_payload(payload).await?;
223
224        if !res.is_valid() {
225            eyre::bail!("Invalid payload")
226        }
227
228        self.last_block_hashes.push_back(header.hash());
229        self.last_header = header;
230        // ensure we keep at most 64 blocks
231        if self.last_block_hashes.len() > 64 {
232            self.last_block_hashes.pop_front();
233        }
234
235        Ok(())
236    }
237}