reth_engine_local/
miner.rs1use alloy_primitives::{TxHash, B256};
4use alloy_rpc_types_engine::ForkchoiceState;
5use eyre::OptionExt;
6use futures_util::{stream::Fuse, Stream, StreamExt};
7use reth_engine_primitives::ConsensusEngineHandle;
8use reth_payload_builder::PayloadBuilderHandle;
9use reth_payload_primitives::{
10 BuiltPayload, EngineApiMessageVersion, PayloadAttributesBuilder, PayloadKind, PayloadTypes,
11};
12use reth_primitives_traits::{HeaderTy, SealedHeaderFor};
13use reth_storage_api::BlockReader;
14use reth_transaction_pool::TransactionPool;
15use std::{
16 collections::VecDeque,
17 fmt,
18 future::Future,
19 pin::Pin,
20 task::{Context, Poll},
21 time::Duration,
22};
23use tokio::time::Interval;
24use tokio_stream::wrappers::ReceiverStream;
25use tracing::error;
26
27pub enum MiningMode<Pool: TransactionPool + Unpin> {
29 Instant {
34 pool: Pool,
36 rx: Fuse<ReceiverStream<TxHash>>,
38 max_transactions: Option<usize>,
41 accumulated: usize,
43 },
44 Interval(Interval),
46 Trigger(Pin<Box<dyn Stream<Item = ()> + Send + Sync>>),
51}
52
53impl<Pool: TransactionPool + Unpin> fmt::Debug for MiningMode<Pool> {
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 match self {
56 Self::Instant { max_transactions, accumulated, .. } => f
57 .debug_struct("Instant")
58 .field("max_transactions", max_transactions)
59 .field("accumulated", accumulated)
60 .finish(),
61 Self::Interval(interval) => f.debug_tuple("Interval").field(interval).finish(),
62 Self::Trigger(_) => f.debug_tuple("Trigger").finish(),
63 }
64 }
65}
66
67impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
68 pub fn instant(pool: Pool, max_transactions: Option<usize>) -> Self {
70 let rx = pool.pending_transactions_listener();
71 Self::Instant { pool, rx: ReceiverStream::new(rx).fuse(), max_transactions, accumulated: 0 }
72 }
73
74 pub fn interval(duration: Duration) -> Self {
76 let start = tokio::time::Instant::now() + duration;
77 Self::Interval(tokio::time::interval_at(start, duration))
78 }
79
80 pub fn trigger(trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
85 Self::Trigger(Box::pin(trigger))
86 }
87}
88
89impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
90 type Output = ();
91
92 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
93 let this = self.get_mut();
94 match this {
95 Self::Instant { pool, rx, max_transactions, accumulated } => {
96 while let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
98 if pool.pending_and_queued_txn_count().0 == 0 {
99 continue;
100 }
101 if let Some(max_tx) = max_transactions {
102 *accumulated += 1;
103 if *accumulated >= *max_tx {
105 *accumulated = 0; return Poll::Ready(());
107 }
108 } else {
109 return Poll::Ready(());
111 }
112 }
113 Poll::Pending
114 }
115 Self::Interval(interval) => {
116 if interval.poll_tick(cx).is_ready() {
117 return Poll::Ready(())
118 }
119 Poll::Pending
120 }
121 Self::Trigger(trigger) => {
122 if trigger.poll_next_unpin(cx).is_ready() {
123 return Poll::Ready(())
124 }
125 Poll::Pending
126 }
127 }
128 }
129}
130
131#[derive(Debug)]
133pub struct LocalMiner<T: PayloadTypes, B, Pool: TransactionPool + Unpin> {
134 payload_attributes_builder: B,
136 to_engine: ConsensusEngineHandle<T>,
138 mode: MiningMode<Pool>,
140 payload_builder: PayloadBuilderHandle<T>,
142 last_header: SealedHeaderFor<<T::BuiltPayload as BuiltPayload>::Primitives>,
144 last_block_hashes: VecDeque<B256>,
146}
147
148impl<T, B, Pool> LocalMiner<T, B, Pool>
149where
150 T: PayloadTypes,
151 B: PayloadAttributesBuilder<
152 T::PayloadAttributes,
153 HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>,
154 >,
155 Pool: TransactionPool + Unpin,
156{
157 pub fn new(
159 provider: impl BlockReader<Header = HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>>,
160 payload_attributes_builder: B,
161 to_engine: ConsensusEngineHandle<T>,
162 mode: MiningMode<Pool>,
163 payload_builder: PayloadBuilderHandle<T>,
164 ) -> Self {
165 let last_header =
166 provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
167
168 Self {
169 payload_attributes_builder,
170 to_engine,
171 mode,
172 payload_builder,
173 last_block_hashes: VecDeque::from([last_header.hash()]),
174 last_header,
175 }
176 }
177
178 pub async fn run(mut self) {
180 let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
181 loop {
182 tokio::select! {
183 _ = &mut self.mode => {
185 if let Err(e) = self.advance().await {
186 error!(target: "engine::local", "Error advancing the chain: {:?}", e);
187 }
188 }
189 _ = fcu_interval.tick() => {
191 if let Err(e) = self.update_forkchoice_state().await {
192 error!(target: "engine::local", "Error updating fork choice: {:?}", e);
193 }
194 }
195 }
196 }
197 }
198
199 fn forkchoice_state(&self) -> ForkchoiceState {
201 ForkchoiceState {
202 head_block_hash: *self.last_block_hashes.back().expect("at least 1 block exists"),
203 safe_block_hash: *self
204 .last_block_hashes
205 .get(self.last_block_hashes.len().saturating_sub(32))
206 .expect("at least 1 block exists"),
207 finalized_block_hash: *self
208 .last_block_hashes
209 .get(self.last_block_hashes.len().saturating_sub(64))
210 .expect("at least 1 block exists"),
211 }
212 }
213
214 async fn update_forkchoice_state(&self) -> eyre::Result<()> {
216 let state = self.forkchoice_state();
217 let res = self
218 .to_engine
219 .fork_choice_updated(state, None, EngineApiMessageVersion::default())
220 .await?;
221
222 if !res.is_valid() {
223 eyre::bail!("Invalid fork choice update {state:?}: {res:?}")
224 }
225
226 Ok(())
227 }
228
229 async fn advance(&mut self) -> eyre::Result<()> {
232 let res = self
233 .to_engine
234 .fork_choice_updated(
235 self.forkchoice_state(),
236 Some(self.payload_attributes_builder.build(&self.last_header)),
237 EngineApiMessageVersion::default(),
238 )
239 .await?;
240
241 if !res.is_valid() {
242 eyre::bail!("Invalid payload status")
243 }
244
245 let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
246
247 let Some(Ok(payload)) =
248 self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
249 else {
250 eyre::bail!("No payload")
251 };
252
253 let header = payload.block().sealed_header().clone();
254 let payload = T::block_to_payload(payload.block().clone());
255 let res = self.to_engine.new_payload(payload).await?;
256
257 if !res.is_valid() {
258 eyre::bail!("Invalid payload")
259 }
260
261 self.last_block_hashes.push_back(header.hash());
262 self.last_header = header;
263 if self.last_block_hashes.len() > 64 {
265 self.last_block_hashes.pop_front();
266 }
267
268 Ok(())
269 }
270}