reth_engine_local/
miner.rs1use alloy_consensus::BlockHeader;
4use alloy_primitives::{TxHash, B256};
5use alloy_rpc_types_engine::ForkchoiceState;
6use eyre::OptionExt;
7use futures_util::{stream::Fuse, StreamExt};
8use reth_engine_primitives::ConsensusEngineHandle;
9use reth_payload_builder::PayloadBuilderHandle;
10use reth_payload_primitives::{
11 BuiltPayload, EngineApiMessageVersion, PayloadAttributesBuilder, PayloadKind, PayloadTypes,
12};
13use reth_provider::BlockReader;
14use reth_transaction_pool::TransactionPool;
15use std::{
16 collections::VecDeque,
17 future::Future,
18 pin::Pin,
19 task::{Context, Poll},
20 time::{Duration, UNIX_EPOCH},
21};
22use tokio::time::Interval;
23use tokio_stream::wrappers::ReceiverStream;
24use tracing::error;
25
26#[derive(Debug)]
28pub enum MiningMode<Pool: TransactionPool + Unpin> {
29 Instant {
34 pool: Pool,
36 rx: Fuse<ReceiverStream<TxHash>>,
38 max_transactions: Option<usize>,
41 accumulated: usize,
43 },
44 Interval(Interval),
46}
47
48impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
49 pub fn instant(pool: Pool, max_transactions: Option<usize>) -> Self {
51 let rx = pool.pending_transactions_listener();
52 Self::Instant { pool, rx: ReceiverStream::new(rx).fuse(), max_transactions, accumulated: 0 }
53 }
54
55 pub fn interval(duration: Duration) -> Self {
57 let start = tokio::time::Instant::now() + duration;
58 Self::Interval(tokio::time::interval_at(start, duration))
59 }
60}
61
62impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
63 type Output = ();
64
65 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66 let this = self.get_mut();
67 match this {
68 Self::Instant { pool, rx, max_transactions, accumulated } => {
69 while let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
71 if pool.pending_and_queued_txn_count().0 == 0 {
72 continue;
73 }
74 if let Some(max_tx) = max_transactions {
75 *accumulated += 1;
76 if *accumulated >= *max_tx {
78 *accumulated = 0; return Poll::Ready(());
80 }
81 } else {
82 return Poll::Ready(());
84 }
85 }
86 Poll::Pending
87 }
88 Self::Interval(interval) => {
89 if interval.poll_tick(cx).is_ready() {
90 return Poll::Ready(())
91 }
92 Poll::Pending
93 }
94 }
95 }
96}
97
98#[derive(Debug)]
100pub struct LocalMiner<T: PayloadTypes, B, Pool: TransactionPool + Unpin> {
101 payload_attributes_builder: B,
103 to_engine: ConsensusEngineHandle<T>,
105 mode: MiningMode<Pool>,
107 payload_builder: PayloadBuilderHandle<T>,
109 last_timestamp: u64,
111 last_block_hashes: VecDeque<B256>,
113}
114
115impl<T, B, Pool> LocalMiner<T, B, Pool>
116where
117 T: PayloadTypes,
118 B: PayloadAttributesBuilder<<T as PayloadTypes>::PayloadAttributes>,
119 Pool: TransactionPool + Unpin,
120{
121 pub fn new(
123 provider: impl BlockReader,
124 payload_attributes_builder: B,
125 to_engine: ConsensusEngineHandle<T>,
126 mode: MiningMode<Pool>,
127 payload_builder: PayloadBuilderHandle<T>,
128 ) -> Self {
129 let latest_header =
130 provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
131
132 Self {
133 payload_attributes_builder,
134 to_engine,
135 mode,
136 payload_builder,
137 last_timestamp: latest_header.timestamp(),
138 last_block_hashes: VecDeque::from([latest_header.hash()]),
139 }
140 }
141
142 pub async fn run(mut self) {
144 let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
145 loop {
146 tokio::select! {
147 _ = &mut self.mode => {
149 if let Err(e) = self.advance().await {
150 error!(target: "engine::local", "Error advancing the chain: {:?}", e);
151 }
152 }
153 _ = fcu_interval.tick() => {
155 if let Err(e) = self.update_forkchoice_state().await {
156 error!(target: "engine::local", "Error updating fork choice: {:?}", e);
157 }
158 }
159 }
160 }
161 }
162
163 fn forkchoice_state(&self) -> ForkchoiceState {
165 ForkchoiceState {
166 head_block_hash: *self.last_block_hashes.back().expect("at least 1 block exists"),
167 safe_block_hash: *self
168 .last_block_hashes
169 .get(self.last_block_hashes.len().saturating_sub(32))
170 .expect("at least 1 block exists"),
171 finalized_block_hash: *self
172 .last_block_hashes
173 .get(self.last_block_hashes.len().saturating_sub(64))
174 .expect("at least 1 block exists"),
175 }
176 }
177
178 async fn update_forkchoice_state(&self) -> eyre::Result<()> {
180 let state = self.forkchoice_state();
181 let res = self
182 .to_engine
183 .fork_choice_updated(state, None, EngineApiMessageVersion::default())
184 .await?;
185
186 if !res.is_valid() {
187 eyre::bail!("Invalid fork choice update {state:?}: {res:?}")
188 }
189
190 Ok(())
191 }
192
193 async fn advance(&mut self) -> eyre::Result<()> {
196 let timestamp = std::cmp::max(
197 self.last_timestamp.saturating_add(1),
198 std::time::SystemTime::now()
199 .duration_since(UNIX_EPOCH)
200 .expect("cannot be earlier than UNIX_EPOCH")
201 .as_secs(),
202 );
203
204 let res = self
205 .to_engine
206 .fork_choice_updated(
207 self.forkchoice_state(),
208 Some(self.payload_attributes_builder.build(timestamp)),
209 EngineApiMessageVersion::default(),
210 )
211 .await?;
212
213 if !res.is_valid() {
214 eyre::bail!("Invalid payload status")
215 }
216
217 let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
218
219 let Some(Ok(payload)) =
220 self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
221 else {
222 eyre::bail!("No payload")
223 };
224
225 let block = payload.block();
226
227 let payload = T::block_to_payload(payload.block().clone());
228 let res = self.to_engine.new_payload(payload).await?;
229
230 if !res.is_valid() {
231 eyre::bail!("Invalid payload")
232 }
233
234 self.last_timestamp = timestamp;
235 self.last_block_hashes.push_back(block.hash());
236 if self.last_block_hashes.len() > 64 {
238 self.last_block_hashes.pop_front();
239 }
240
241 Ok(())
242 }
243}