reth_engine_local/
miner.rs
1use alloy_consensus::BlockHeader;
4use alloy_primitives::{TxHash, B256};
5use alloy_rpc_types_engine::ForkchoiceState;
6use eyre::OptionExt;
7use futures_util::{stream::Fuse, StreamExt};
8use reth_engine_primitives::{BeaconEngineMessage, EngineTypes};
9use reth_payload_builder::PayloadBuilderHandle;
10use reth_payload_primitives::{
11 BuiltPayload, EngineApiMessageVersion, PayloadAttributesBuilder, PayloadKind, PayloadTypes,
12};
13use reth_provider::BlockReader;
14use reth_transaction_pool::TransactionPool;
15use std::{
16 future::Future,
17 pin::Pin,
18 task::{Context, Poll},
19 time::{Duration, UNIX_EPOCH},
20};
21use tokio::{
22 sync::{mpsc::UnboundedSender, oneshot},
23 time::Interval,
24};
25use tokio_stream::wrappers::ReceiverStream;
26use tracing::error;
27
28#[derive(Debug)]
30pub enum MiningMode {
31 Instant(Fuse<ReceiverStream<TxHash>>),
34 Interval(Interval),
36}
37
38impl MiningMode {
39 pub fn instant<Pool: TransactionPool>(pool: Pool) -> Self {
41 let rx = pool.pending_transactions_listener();
42 Self::Instant(ReceiverStream::new(rx).fuse())
43 }
44
45 pub fn interval(duration: Duration) -> Self {
47 let start = tokio::time::Instant::now() + duration;
48 Self::Interval(tokio::time::interval_at(start, duration))
49 }
50}
51
52impl Future for MiningMode {
53 type Output = ();
54
55 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
56 let this = self.get_mut();
57 match this {
58 Self::Instant(rx) => {
59 if let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
61 return Poll::Ready(())
62 }
63 Poll::Pending
64 }
65 Self::Interval(interval) => {
66 if interval.poll_tick(cx).is_ready() {
67 return Poll::Ready(())
68 }
69 Poll::Pending
70 }
71 }
72 }
73}
74
75#[derive(Debug)]
77pub struct LocalMiner<EngineT: EngineTypes, B> {
78 payload_attributes_builder: B,
80 to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
82 mode: MiningMode,
84 payload_builder: PayloadBuilderHandle<EngineT>,
86 last_timestamp: u64,
88 last_block_hashes: Vec<B256>,
90}
91
92impl<EngineT, B> LocalMiner<EngineT, B>
93where
94 EngineT: EngineTypes,
95 B: PayloadAttributesBuilder<<EngineT as PayloadTypes>::PayloadAttributes>,
96{
97 pub fn spawn_new(
99 provider: impl BlockReader,
100 payload_attributes_builder: B,
101 to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
102 mode: MiningMode,
103 payload_builder: PayloadBuilderHandle<EngineT>,
104 ) {
105 let latest_header =
106 provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
107
108 let miner = Self {
109 payload_attributes_builder,
110 to_engine,
111 mode,
112 payload_builder,
113 last_timestamp: latest_header.timestamp(),
114 last_block_hashes: vec![latest_header.hash()],
115 };
116
117 tokio::spawn(miner.run());
119 }
120
121 async fn run(mut self) {
123 let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
124 loop {
125 tokio::select! {
126 _ = &mut self.mode => {
128 if let Err(e) = self.advance().await {
129 error!(target: "engine::local", "Error advancing the chain: {:?}", e);
130 }
131 }
132 _ = fcu_interval.tick() => {
134 if let Err(e) = self.update_forkchoice_state().await {
135 error!(target: "engine::local", "Error updating fork choice: {:?}", e);
136 }
137 }
138 }
139 }
140 }
141
142 fn forkchoice_state(&self) -> ForkchoiceState {
144 ForkchoiceState {
145 head_block_hash: *self.last_block_hashes.last().expect("at least 1 block exists"),
146 safe_block_hash: *self
147 .last_block_hashes
148 .get(self.last_block_hashes.len().saturating_sub(32))
149 .expect("at least 1 block exists"),
150 finalized_block_hash: *self
151 .last_block_hashes
152 .get(self.last_block_hashes.len().saturating_sub(64))
153 .expect("at least 1 block exists"),
154 }
155 }
156
157 async fn update_forkchoice_state(&self) -> eyre::Result<()> {
159 let (tx, rx) = oneshot::channel();
160 self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
161 state: self.forkchoice_state(),
162 payload_attrs: None,
163 tx,
164 version: EngineApiMessageVersion::default(),
165 })?;
166
167 let res = rx.await??;
168 if !res.forkchoice_status().is_valid() {
169 eyre::bail!("Invalid fork choice update")
170 }
171
172 Ok(())
173 }
174
175 async fn advance(&mut self) -> eyre::Result<()> {
178 let timestamp = std::cmp::max(
179 self.last_timestamp + 1,
180 std::time::SystemTime::now()
181 .duration_since(UNIX_EPOCH)
182 .expect("cannot be earlier than UNIX_EPOCH")
183 .as_secs(),
184 );
185
186 let (tx, rx) = oneshot::channel();
187 self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
188 state: self.forkchoice_state(),
189 payload_attrs: Some(self.payload_attributes_builder.build(timestamp)),
190 tx,
191 version: EngineApiMessageVersion::default(),
192 })?;
193
194 let res = rx.await??.await?;
195 if !res.payload_status.is_valid() {
196 eyre::bail!("Invalid payload status")
197 }
198
199 let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
200
201 let Some(Ok(payload)) =
202 self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
203 else {
204 eyre::bail!("No payload")
205 };
206
207 let block = payload.block();
208
209 let (tx, rx) = oneshot::channel();
210 let payload = EngineT::block_to_payload(payload.block().clone());
211 self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx })?;
212
213 let res = rx.await??;
214
215 if !res.is_valid() {
216 eyre::bail!("Invalid payload")
217 }
218
219 self.last_timestamp = timestamp;
220 self.last_block_hashes.push(block.hash());
221 if self.last_block_hashes.len() > 64 {
223 self.last_block_hashes =
224 self.last_block_hashes.split_off(self.last_block_hashes.len() - 64);
225 }
226
227 Ok(())
228 }
229}