reth_engine_local/
miner.rs1use alloy_primitives::{TxHash, B256};
4use alloy_rpc_types_engine::ForkchoiceState;
5use eyre::OptionExt;
6use futures_util::{stream::Fuse, Stream, StreamExt};
7use reth_engine_primitives::ConsensusEngineHandle;
8use reth_payload_builder::PayloadBuilderHandle;
9use reth_payload_primitives::{BuiltPayload, PayloadAttributesBuilder, PayloadKind, PayloadTypes};
10use reth_primitives_traits::{HeaderTy, SealedHeaderFor};
11use reth_storage_api::BlockReader;
12use reth_transaction_pool::TransactionPool;
13use std::{
14 collections::VecDeque,
15 fmt,
16 future::Future,
17 pin::Pin,
18 task::{Context, Poll},
19 time::Duration,
20};
21use tokio::time::Interval;
22use tokio_stream::wrappers::ReceiverStream;
23use tracing::error;
24
25pub enum MiningMode<Pool: TransactionPool + Unpin> {
27 Instant {
32 pool: Pool,
34 rx: Fuse<ReceiverStream<TxHash>>,
36 max_transactions: Option<usize>,
39 accumulated: usize,
41 },
42 Interval(Interval),
44 Trigger(Pin<Box<dyn Stream<Item = ()> + Send + Sync>>),
49}
50
51impl<Pool: TransactionPool + Unpin> fmt::Debug for MiningMode<Pool> {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 match self {
54 Self::Instant { max_transactions, accumulated, .. } => f
55 .debug_struct("Instant")
56 .field("max_transactions", max_transactions)
57 .field("accumulated", accumulated)
58 .finish(),
59 Self::Interval(interval) => f.debug_tuple("Interval").field(interval).finish(),
60 Self::Trigger(_) => f.debug_tuple("Trigger").finish(),
61 }
62 }
63}
64
65impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
66 pub fn instant(pool: Pool, max_transactions: Option<usize>) -> Self {
68 let rx = pool.pending_transactions_listener();
69 Self::Instant { pool, rx: ReceiverStream::new(rx).fuse(), max_transactions, accumulated: 0 }
70 }
71
72 pub fn interval(duration: Duration) -> Self {
74 let start = tokio::time::Instant::now() + duration;
75 Self::Interval(tokio::time::interval_at(start, duration))
76 }
77
78 pub fn trigger(trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
83 Self::Trigger(Box::pin(trigger))
84 }
85}
86
87impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
88 type Output = ();
89
90 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
91 let this = self.get_mut();
92 match this {
93 Self::Instant { pool, rx, max_transactions, accumulated } => {
94 while let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
96 if pool.pending_and_queued_txn_count().0 == 0 {
97 continue;
98 }
99 if let Some(max_tx) = max_transactions {
100 *accumulated += 1;
101 if *accumulated >= *max_tx {
103 *accumulated = 0; return Poll::Ready(());
105 }
106 } else {
107 return Poll::Ready(());
109 }
110 }
111 Poll::Pending
112 }
113 Self::Interval(interval) => {
114 if interval.poll_tick(cx).is_ready() {
115 return Poll::Ready(())
116 }
117 Poll::Pending
118 }
119 Self::Trigger(trigger) => {
120 if trigger.poll_next_unpin(cx).is_ready() {
121 return Poll::Ready(())
122 }
123 Poll::Pending
124 }
125 }
126 }
127}
128
129#[derive(Debug)]
131pub struct LocalMiner<T: PayloadTypes, B, Pool: TransactionPool + Unpin> {
132 payload_attributes_builder: B,
134 to_engine: ConsensusEngineHandle<T>,
136 mode: MiningMode<Pool>,
138 payload_builder: PayloadBuilderHandle<T>,
140 last_header: SealedHeaderFor<<T::BuiltPayload as BuiltPayload>::Primitives>,
142 last_block_hashes: VecDeque<B256>,
144}
145
146impl<T, B, Pool> LocalMiner<T, B, Pool>
147where
148 T: PayloadTypes,
149 B: PayloadAttributesBuilder<
150 T::PayloadAttributes,
151 HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>,
152 >,
153 Pool: TransactionPool + Unpin,
154{
155 pub fn new(
157 provider: impl BlockReader<Header = HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>>,
158 payload_attributes_builder: B,
159 to_engine: ConsensusEngineHandle<T>,
160 mode: MiningMode<Pool>,
161 payload_builder: PayloadBuilderHandle<T>,
162 ) -> Self {
163 let last_header =
164 provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
165
166 Self {
167 payload_attributes_builder,
168 to_engine,
169 mode,
170 payload_builder,
171 last_block_hashes: VecDeque::from([last_header.hash()]),
172 last_header,
173 }
174 }
175
176 pub async fn run(mut self) {
178 let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
179 loop {
180 tokio::select! {
181 _ = &mut self.mode => {
183 if let Err(e) = self.advance().await {
184 error!(target: "engine::local", "Error advancing the chain: {:?}", e);
185 }
186 }
187 _ = fcu_interval.tick() => {
189 if let Err(e) = self.update_forkchoice_state().await {
190 error!(target: "engine::local", "Error updating fork choice: {:?}", e);
191 }
192 }
193 }
194 }
195 }
196
197 fn forkchoice_state(&self) -> ForkchoiceState {
199 ForkchoiceState {
200 head_block_hash: *self.last_block_hashes.back().expect("at least 1 block exists"),
201 safe_block_hash: *self
202 .last_block_hashes
203 .get(self.last_block_hashes.len().saturating_sub(32))
204 .expect("at least 1 block exists"),
205 finalized_block_hash: *self
206 .last_block_hashes
207 .get(self.last_block_hashes.len().saturating_sub(64))
208 .expect("at least 1 block exists"),
209 }
210 }
211
212 async fn update_forkchoice_state(&self) -> eyre::Result<()> {
214 let state = self.forkchoice_state();
215 let res = self.to_engine.fork_choice_updated(state, None).await?;
216
217 if !res.is_valid() {
218 eyre::bail!("Invalid fork choice update {state:?}: {res:?}")
219 }
220
221 Ok(())
222 }
223
224 async fn advance(&mut self) -> eyre::Result<()> {
227 let res = self
228 .to_engine
229 .fork_choice_updated(
230 self.forkchoice_state(),
231 Some(self.payload_attributes_builder.build(&self.last_header)),
232 )
233 .await?;
234
235 if !res.is_valid() {
236 eyre::bail!("Invalid payload status")
237 }
238
239 let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
240
241 let Some(Ok(payload)) =
242 self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
243 else {
244 eyre::bail!("No payload")
245 };
246
247 let header = payload.block().sealed_header().clone();
248 let payload = T::block_to_payload(payload.block().clone());
249 let res = self.to_engine.new_payload(payload).await?;
250
251 if !res.is_valid() {
252 eyre::bail!("Invalid payload")
253 }
254
255 self.last_block_hashes.push_back(header.hash());
256 self.last_header = header;
257 if self.last_block_hashes.len() > 64 {
259 self.last_block_hashes.pop_front();
260 }
261
262 Ok(())
263 }
264}