reth_engine_local/
miner.rs1use alloy_primitives::{TxHash, B256};
4use alloy_rpc_types_engine::ForkchoiceState;
5use eyre::OptionExt;
6use futures_util::{stream::Fuse, StreamExt};
7use reth_engine_primitives::ConsensusEngineHandle;
8use reth_payload_builder::PayloadBuilderHandle;
9use reth_payload_primitives::{
10 BuiltPayload, EngineApiMessageVersion, PayloadAttributesBuilder, PayloadKind, PayloadTypes,
11};
12use reth_primitives_traits::{HeaderTy, SealedHeaderFor};
13use reth_storage_api::BlockReader;
14use reth_transaction_pool::TransactionPool;
15use std::{
16 collections::VecDeque,
17 future::Future,
18 pin::Pin,
19 task::{Context, Poll},
20 time::Duration,
21};
22use tokio::time::Interval;
23use tokio_stream::wrappers::ReceiverStream;
24use tracing::error;
25
26#[derive(Debug)]
28pub enum MiningMode<Pool: TransactionPool + Unpin> {
29 Instant {
34 pool: Pool,
36 rx: Fuse<ReceiverStream<TxHash>>,
38 max_transactions: Option<usize>,
41 accumulated: usize,
43 },
44 Interval(Interval),
46}
47
48impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
49 pub fn instant(pool: Pool, max_transactions: Option<usize>) -> Self {
51 let rx = pool.pending_transactions_listener();
52 Self::Instant { pool, rx: ReceiverStream::new(rx).fuse(), max_transactions, accumulated: 0 }
53 }
54
55 pub fn interval(duration: Duration) -> Self {
57 let start = tokio::time::Instant::now() + duration;
58 Self::Interval(tokio::time::interval_at(start, duration))
59 }
60}
61
62impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
63 type Output = ();
64
65 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66 let this = self.get_mut();
67 match this {
68 Self::Instant { pool, rx, max_transactions, accumulated } => {
69 while let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
71 if pool.pending_and_queued_txn_count().0 == 0 {
72 continue;
73 }
74 if let Some(max_tx) = max_transactions {
75 *accumulated += 1;
76 if *accumulated >= *max_tx {
78 *accumulated = 0; return Poll::Ready(());
80 }
81 } else {
82 return Poll::Ready(());
84 }
85 }
86 Poll::Pending
87 }
88 Self::Interval(interval) => {
89 if interval.poll_tick(cx).is_ready() {
90 return Poll::Ready(())
91 }
92 Poll::Pending
93 }
94 }
95 }
96}
97
98#[derive(Debug)]
100pub struct LocalMiner<T: PayloadTypes, B, Pool: TransactionPool + Unpin> {
101 payload_attributes_builder: B,
103 to_engine: ConsensusEngineHandle<T>,
105 mode: MiningMode<Pool>,
107 payload_builder: PayloadBuilderHandle<T>,
109 last_header: SealedHeaderFor<<T::BuiltPayload as BuiltPayload>::Primitives>,
111 last_block_hashes: VecDeque<B256>,
113}
114
115impl<T, B, Pool> LocalMiner<T, B, Pool>
116where
117 T: PayloadTypes,
118 B: PayloadAttributesBuilder<
119 T::PayloadAttributes,
120 HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>,
121 >,
122 Pool: TransactionPool + Unpin,
123{
124 pub fn new(
126 provider: impl BlockReader<Header = HeaderTy<<T::BuiltPayload as BuiltPayload>::Primitives>>,
127 payload_attributes_builder: B,
128 to_engine: ConsensusEngineHandle<T>,
129 mode: MiningMode<Pool>,
130 payload_builder: PayloadBuilderHandle<T>,
131 ) -> Self {
132 let last_header =
133 provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
134
135 Self {
136 payload_attributes_builder,
137 to_engine,
138 mode,
139 payload_builder,
140 last_block_hashes: VecDeque::from([last_header.hash()]),
141 last_header,
142 }
143 }
144
145 pub async fn run(mut self) {
147 let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
148 loop {
149 tokio::select! {
150 _ = &mut self.mode => {
152 if let Err(e) = self.advance().await {
153 error!(target: "engine::local", "Error advancing the chain: {:?}", e);
154 }
155 }
156 _ = fcu_interval.tick() => {
158 if let Err(e) = self.update_forkchoice_state().await {
159 error!(target: "engine::local", "Error updating fork choice: {:?}", e);
160 }
161 }
162 }
163 }
164 }
165
166 fn forkchoice_state(&self) -> ForkchoiceState {
168 ForkchoiceState {
169 head_block_hash: *self.last_block_hashes.back().expect("at least 1 block exists"),
170 safe_block_hash: *self
171 .last_block_hashes
172 .get(self.last_block_hashes.len().saturating_sub(32))
173 .expect("at least 1 block exists"),
174 finalized_block_hash: *self
175 .last_block_hashes
176 .get(self.last_block_hashes.len().saturating_sub(64))
177 .expect("at least 1 block exists"),
178 }
179 }
180
181 async fn update_forkchoice_state(&self) -> eyre::Result<()> {
183 let state = self.forkchoice_state();
184 let res = self
185 .to_engine
186 .fork_choice_updated(state, None, EngineApiMessageVersion::default())
187 .await?;
188
189 if !res.is_valid() {
190 eyre::bail!("Invalid fork choice update {state:?}: {res:?}")
191 }
192
193 Ok(())
194 }
195
196 async fn advance(&mut self) -> eyre::Result<()> {
199 let res = self
200 .to_engine
201 .fork_choice_updated(
202 self.forkchoice_state(),
203 Some(self.payload_attributes_builder.build(&self.last_header)),
204 EngineApiMessageVersion::default(),
205 )
206 .await?;
207
208 if !res.is_valid() {
209 eyre::bail!("Invalid payload status")
210 }
211
212 let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
213
214 let Some(Ok(payload)) =
215 self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
216 else {
217 eyre::bail!("No payload")
218 };
219
220 let header = payload.block().sealed_header().clone();
221 let payload = T::block_to_payload(payload.block().clone());
222 let res = self.to_engine.new_payload(payload).await?;
223
224 if !res.is_valid() {
225 eyre::bail!("Invalid payload")
226 }
227
228 self.last_block_hashes.push_back(header.hash());
229 self.last_header = header;
230 if self.last_block_hashes.len() > 64 {
232 self.last_block_hashes.pop_front();
233 }
234
235 Ok(())
236 }
237}