reth_engine_local/
miner.rs1use alloy_primitives::{TxHash, B256};
4use alloy_rpc_types_engine::ForkchoiceState;
5use eyre::OptionExt;
6use futures_util::{stream::Fuse, Stream, StreamExt};
7use reth_engine_primitives::ConsensusEngineHandle;
8use reth_payload_builder::PayloadBuilderHandle;
9use reth_payload_primitives::{BuiltPayload, PayloadAttributesBuilder, PayloadKind, PayloadTypes};
10use reth_primitives_traits::{HeaderTy, SealedHeaderFor};
11use reth_storage_api::BlockReader;
12use reth_transaction_pool::TransactionPool;
13use std::{
14 collections::VecDeque,
15 fmt,
16 future::Future,
17 pin::Pin,
18 task::{Context, Poll},
19 time::Duration,
20};
21use tokio::time::Interval;
22use tokio_stream::wrappers::ReceiverStream;
23use tracing::error;
24
25pub enum MiningMode<Pool: TransactionPool + Unpin> {
27 Instant {
32 pool: Pool,
34 rx: Fuse<ReceiverStream<TxHash>>,
36 max_transactions: Option<usize>,
39 accumulated: usize,
41 },
42 Interval(Interval),
44 Trigger(Pin<Box<dyn Stream<Item = ()> + Send + Sync>>),
49}
50
51impl<Pool: TransactionPool + Unpin> fmt::Debug for MiningMode<Pool> {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 match self {
54 Self::Instant { max_transactions, accumulated, .. } => f
55 .debug_struct("Instant")
56 .field("max_transactions", max_transactions)
57 .field("accumulated", accumulated)
58 .finish(),
59 Self::Interval(interval) => f.debug_tuple("Interval").field(interval).finish(),
60 Self::Trigger(_) => f.debug_tuple("Trigger").finish(),
61 }
62 }
63}
64
65impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
66 pub fn instant(pool: Pool, max_transactions: Option<usize>) -> Self {
68 let rx = pool.pending_transactions_listener();
69 Self::Instant { pool, rx: ReceiverStream::new(rx).fuse(), max_transactions, accumulated: 0 }
70 }
71
72 pub fn interval(duration: Duration) -> Self {
74 let start = tokio::time::Instant::now() + duration;
75 Self::Interval(tokio::time::interval_at(start, duration))
76 }
77
78 pub fn trigger(trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
83 Self::Trigger(Box::pin(trigger))
84 }
85}
86
87impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
88 type Output = ();
89
90 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
91 let this = self.get_mut();
92 match this {
93 Self::Instant { pool, rx, max_transactions, accumulated } => {
94 while let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
96 if pool.pending_and_queued_txn_count().0 == 0 {
97 continue;
98 }
99 if let Some(max_tx) = max_transactions {
100 *accumulated += 1;
101 if *accumulated >= *max_tx {
103 *accumulated = 0; return Poll::Ready(());
105 }
106 } else {
107 return Poll::Ready(());
109 }
110 }
111 Poll::Pending
112 }
113 Self::Interval(interval) => {
114 if interval.poll_tick(cx).is_ready() {
115 return Poll::Ready(())
116 }
117 Poll::Pending
118 }
119 Self::Trigger(trigger) => {
120 if trigger.poll_next_unpin(cx).is_ready() {
121 return Poll::Ready(())
122 }
123 Poll::Pending
124 }
125 }
126 }
127}
128
129#[derive(Debug)]
131pub struct LocalMiner<T: PayloadTypes, B, Pool: TransactionPool + Unpin> {
132 payload_attributes_builder: B,
134 to_engine: ConsensusEngineHandle<T>,
136 mode: MiningMode<Pool>,
138 payload_builder: PayloadBuilderHandle<T>,
140 last_header: SealedHeaderFor<<T::BuiltPayload as BuiltPayload>::Primitives>,
142 last_block_hashes: VecDeque<B256>,
144 payload_wait_time: Option<Duration>,
149}
150
151impl<T, B, Pool> LocalMiner<T, B, Pool>
152where
153 T: PayloadTypes,
154 B: PayloadAttributesBuilder<
155 T::PayloadAttributes,
156 HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>,
157 >,
158 Pool: TransactionPool + Unpin,
159{
160 pub fn new(
162 provider: impl BlockReader<Header = HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>>,
163 payload_attributes_builder: B,
164 to_engine: ConsensusEngineHandle<T>,
165 mode: MiningMode<Pool>,
166 payload_builder: PayloadBuilderHandle<T>,
167 ) -> Self {
168 let last_header =
169 provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
170
171 Self {
172 payload_attributes_builder,
173 to_engine,
174 mode,
175 payload_builder,
176 last_block_hashes: VecDeque::from([last_header.hash()]),
177 last_header,
178 payload_wait_time: None,
179 }
180 }
181
182 pub const fn with_payload_wait_time_opt(mut self, wait_time: Option<Duration>) -> Self {
184 self.payload_wait_time = wait_time;
185 self
186 }
187
188 pub async fn run(mut self) {
190 let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
191 loop {
192 tokio::select! {
193 _ = &mut self.mode => {
195 if let Err(e) = self.advance().await {
196 error!(target: "engine::local", "Error advancing the chain: {:?}", e);
197 }
198 }
199 _ = fcu_interval.tick() => {
201 if let Err(e) = self.update_forkchoice_state().await {
202 error!(target: "engine::local", "Error updating fork choice: {:?}", e);
203 }
204 }
205 }
206 }
207 }
208
209 fn forkchoice_state(&self) -> ForkchoiceState {
211 ForkchoiceState {
212 head_block_hash: *self.last_block_hashes.back().expect("at least 1 block exists"),
213 safe_block_hash: *self
214 .last_block_hashes
215 .get(self.last_block_hashes.len().saturating_sub(32))
216 .expect("at least 1 block exists"),
217 finalized_block_hash: *self
218 .last_block_hashes
219 .get(self.last_block_hashes.len().saturating_sub(64))
220 .expect("at least 1 block exists"),
221 }
222 }
223
224 async fn update_forkchoice_state(&self) -> eyre::Result<()> {
226 let state = self.forkchoice_state();
227 let res = self.to_engine.fork_choice_updated(state, None).await?;
228
229 if !res.is_valid() {
230 eyre::bail!("Invalid fork choice update {state:?}: {res:?}")
231 }
232
233 Ok(())
234 }
235
236 async fn advance(&mut self) -> eyre::Result<()> {
239 let res = self
240 .to_engine
241 .fork_choice_updated(
242 self.forkchoice_state(),
243 Some(self.payload_attributes_builder.build(&self.last_header)),
244 )
245 .await?;
246
247 if !res.is_valid() {
248 eyre::bail!("Invalid payload status")
249 }
250
251 let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
252
253 if let Some(wait_time) = self.payload_wait_time {
254 tokio::time::sleep(wait_time).await;
255 }
256
257 let Some(Ok(payload)) =
258 self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
259 else {
260 eyre::bail!("No payload")
261 };
262
263 let header = payload.block().sealed_header().clone();
264 let res = self.to_engine.new_payload(payload.into()).await?;
265
266 if !res.is_valid() {
267 eyre::bail!("Invalid payload")
268 }
269
270 self.last_block_hashes.push_back(header.hash());
271 self.last_header = header;
272 if self.last_block_hashes.len() > 64 {
274 self.last_block_hashes.pop_front();
275 }
276
277 Ok(())
278 }
279}