reth_engine_local/
miner.rs1use alloy_consensus::BlockHeader;
4use alloy_primitives::{TxHash, B256};
5use alloy_rpc_types_engine::ForkchoiceState;
6use eyre::OptionExt;
7use futures_util::{stream::Fuse, StreamExt};
8use reth_engine_primitives::ConsensusEngineHandle;
9use reth_payload_builder::PayloadBuilderHandle;
10use reth_payload_primitives::{
11 BuiltPayload, EngineApiMessageVersion, PayloadAttributesBuilder, PayloadKind, PayloadTypes,
12};
13use reth_provider::BlockReader;
14use reth_transaction_pool::TransactionPool;
15use std::{
16 future::Future,
17 pin::Pin,
18 task::{Context, Poll},
19 time::{Duration, UNIX_EPOCH},
20};
21use tokio::time::Interval;
22use tokio_stream::wrappers::ReceiverStream;
23use tracing::error;
24
25#[derive(Debug)]
27pub enum MiningMode<Pool: TransactionPool + Unpin> {
28 Instant {
33 pool: Pool,
35 rx: Fuse<ReceiverStream<TxHash>>,
37 max_transactions: Option<usize>,
40 accumulated: usize,
42 },
43 Interval(Interval),
45}
46
47impl<Pool: TransactionPool + Unpin> MiningMode<Pool> {
48 pub fn instant(pool: Pool, max_transactions: Option<usize>) -> Self {
50 let rx = pool.pending_transactions_listener();
51 Self::Instant { pool, rx: ReceiverStream::new(rx).fuse(), max_transactions, accumulated: 0 }
52 }
53
54 pub fn interval(duration: Duration) -> Self {
56 let start = tokio::time::Instant::now() + duration;
57 Self::Interval(tokio::time::interval_at(start, duration))
58 }
59}
60
61impl<Pool: TransactionPool + Unpin> Future for MiningMode<Pool> {
62 type Output = ();
63
64 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
65 let this = self.get_mut();
66 match this {
67 Self::Instant { pool, rx, max_transactions, accumulated } => {
68 while let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
70 if pool.pending_and_queued_txn_count().0 == 0 {
71 continue;
72 }
73 if let Some(max_tx) = max_transactions {
74 *accumulated += 1;
75 if *accumulated >= *max_tx {
77 *accumulated = 0; return Poll::Ready(());
79 }
80 } else {
81 return Poll::Ready(());
83 }
84 }
85 Poll::Pending
86 }
87 Self::Interval(interval) => {
88 if interval.poll_tick(cx).is_ready() {
89 return Poll::Ready(())
90 }
91 Poll::Pending
92 }
93 }
94 }
95}
96
97#[derive(Debug)]
99pub struct LocalMiner<T: PayloadTypes, B, Pool: TransactionPool + Unpin> {
100 payload_attributes_builder: B,
102 to_engine: ConsensusEngineHandle<T>,
104 mode: MiningMode<Pool>,
106 payload_builder: PayloadBuilderHandle<T>,
108 last_timestamp: u64,
110 last_block_hashes: Vec<B256>,
112}
113
114impl<T, B, Pool> LocalMiner<T, B, Pool>
115where
116 T: PayloadTypes,
117 B: PayloadAttributesBuilder<<T as PayloadTypes>::PayloadAttributes>,
118 Pool: TransactionPool + Unpin,
119{
120 pub fn new(
122 provider: impl BlockReader,
123 payload_attributes_builder: B,
124 to_engine: ConsensusEngineHandle<T>,
125 mode: MiningMode<Pool>,
126 payload_builder: PayloadBuilderHandle<T>,
127 ) -> Self {
128 let latest_header =
129 provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
130
131 Self {
132 payload_attributes_builder,
133 to_engine,
134 mode,
135 payload_builder,
136 last_timestamp: latest_header.timestamp(),
137 last_block_hashes: vec![latest_header.hash()],
138 }
139 }
140
141 pub async fn run(mut self) {
143 let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
144 loop {
145 tokio::select! {
146 _ = &mut self.mode => {
148 if let Err(e) = self.advance().await {
149 error!(target: "engine::local", "Error advancing the chain: {:?}", e);
150 }
151 }
152 _ = fcu_interval.tick() => {
154 if let Err(e) = self.update_forkchoice_state().await {
155 error!(target: "engine::local", "Error updating fork choice: {:?}", e);
156 }
157 }
158 }
159 }
160 }
161
162 fn forkchoice_state(&self) -> ForkchoiceState {
164 ForkchoiceState {
165 head_block_hash: *self.last_block_hashes.last().expect("at least 1 block exists"),
166 safe_block_hash: *self
167 .last_block_hashes
168 .get(self.last_block_hashes.len().saturating_sub(32))
169 .expect("at least 1 block exists"),
170 finalized_block_hash: *self
171 .last_block_hashes
172 .get(self.last_block_hashes.len().saturating_sub(64))
173 .expect("at least 1 block exists"),
174 }
175 }
176
177 async fn update_forkchoice_state(&self) -> eyre::Result<()> {
179 let res = self
180 .to_engine
181 .fork_choice_updated(self.forkchoice_state(), None, EngineApiMessageVersion::default())
182 .await?;
183
184 if !res.is_valid() {
185 eyre::bail!("Invalid fork choice update")
186 }
187
188 Ok(())
189 }
190
191 async fn advance(&mut self) -> eyre::Result<()> {
194 let timestamp = std::cmp::max(
195 self.last_timestamp + 1,
196 std::time::SystemTime::now()
197 .duration_since(UNIX_EPOCH)
198 .expect("cannot be earlier than UNIX_EPOCH")
199 .as_secs(),
200 );
201
202 let res = self
203 .to_engine
204 .fork_choice_updated(
205 self.forkchoice_state(),
206 Some(self.payload_attributes_builder.build(timestamp)),
207 EngineApiMessageVersion::default(),
208 )
209 .await?;
210
211 if !res.is_valid() {
212 eyre::bail!("Invalid payload status")
213 }
214
215 let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
216
217 let Some(Ok(payload)) =
218 self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
219 else {
220 eyre::bail!("No payload")
221 };
222
223 let block = payload.block();
224
225 let payload = T::block_to_payload(payload.block().clone());
226 let res = self.to_engine.new_payload(payload).await?;
227
228 if !res.is_valid() {
229 eyre::bail!("Invalid payload")
230 }
231
232 self.last_timestamp = timestamp;
233 self.last_block_hashes.push(block.hash());
234 if self.last_block_hashes.len() > 64 {
236 self.last_block_hashes =
237 self.last_block_hashes.split_off(self.last_block_hashes.len() - 64);
238 }
239
240 Ok(())
241 }
242}