reth_e2e_test_utils/testsuite/
actions.rs
1use crate::testsuite::Environment;
4use alloy_primitives::{Bytes, B256};
5use alloy_rpc_types_engine::{
6 ExecutionPayloadV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum,
7};
8use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction};
9use eyre::Result;
10use futures_util::future::BoxFuture;
11use reth_node_api::{EngineTypes, PayloadTypes};
12use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
13use std::{future::Future, marker::PhantomData, time::Duration};
14use tokio::time::sleep;
15use tracing::debug;
16
17pub trait Action<I>: Send + 'static {
23 fn execute<'a>(&'a mut self, env: &'a mut Environment<I>) -> BoxFuture<'a, Result<()>>;
25}
26
27#[expect(missing_debug_implementations)]
29pub struct ActionBox<I>(Box<dyn Action<I>>);
30
31impl<I: 'static> ActionBox<I> {
32 pub fn new<A: Action<I>>(action: A) -> Self {
34 Self(Box::new(action))
35 }
36
37 pub async fn execute(mut self, env: &mut Environment<I>) -> Result<()> {
39 self.0.execute(env).await
40 }
41}
42
43impl<I, F, Fut> Action<I> for F
48where
49 F: FnMut(&Environment<I>) -> Fut + Send + 'static,
50 Fut: Future<Output = Result<()>> + Send + 'static,
51{
52 fn execute<'a>(&'a mut self, env: &'a mut Environment<I>) -> BoxFuture<'a, Result<()>> {
53 Box::pin(self(env))
54 }
55}
56
57#[derive(Debug)]
60pub struct AssertMineBlock<Engine>
61where
62 Engine: PayloadTypes,
63{
64 pub node_idx: usize,
66 pub transactions: Vec<Bytes>,
68 pub expected_hash: Option<B256>,
70 pub payload_attributes: Engine::PayloadAttributes,
73 _phantom: PhantomData<Engine>,
75}
76
77impl<Engine> AssertMineBlock<Engine>
78where
79 Engine: PayloadTypes,
80{
81 pub fn new(
83 node_idx: usize,
84 transactions: Vec<Bytes>,
85 expected_hash: Option<B256>,
86 payload_attributes: Engine::PayloadAttributes,
87 ) -> Self {
88 Self {
89 node_idx,
90 transactions,
91 expected_hash,
92 payload_attributes,
93 _phantom: Default::default(),
94 }
95 }
96}
97
98impl<Engine> Action<Engine> for AssertMineBlock<Engine>
99where
100 Engine: EngineTypes,
101{
102 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
103 Box::pin(async move {
104 if self.node_idx >= env.node_clients.len() {
105 return Err(eyre::eyre!("Node index out of bounds: {}", self.node_idx));
106 }
107
108 let node_client = &env.node_clients[self.node_idx];
109 let rpc_client = &node_client.rpc;
110 let engine_client = &node_client.engine;
111
112 let latest_block =
114 EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
115 rpc_client,
116 alloy_eips::BlockNumberOrTag::Latest,
117 false,
118 )
119 .await?;
120
121 let latest_block = latest_block.ok_or_else(|| eyre::eyre!("Latest block not found"))?;
122 let parent_hash = latest_block.header.hash;
123
124 debug!("Latest block hash: {parent_hash}");
125
126 let fork_choice_state = ForkchoiceState {
128 head_block_hash: parent_hash,
129 safe_block_hash: parent_hash,
130 finalized_block_hash: parent_hash,
131 };
132
133 let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v2(
134 engine_client,
135 fork_choice_state,
136 Some(self.payload_attributes.clone()),
137 )
138 .await?;
139
140 debug!("FCU result: {:?}", fcu_result);
141
142 match fcu_result.payload_status.status {
144 PayloadStatusEnum::Valid => {
145 if let Some(payload_id) = fcu_result.payload_id {
146 debug!("Got payload ID: {payload_id}");
147
148 let _engine_payload =
150 EngineApiClient::<Engine>::get_payload_v2(engine_client, payload_id)
151 .await?;
152 Ok(())
153 } else {
154 Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
155 }
156 }
157 _ => Err(eyre::eyre!("Payload status not valid: {:?}", fcu_result.payload_status)),
158 }
159 })
160 }
161}
162#[derive(Debug, Default)]
164pub struct PickNextBlockProducer {}
165
166impl PickNextBlockProducer {
167 pub const fn new() -> Self {
169 Self {}
170 }
171}
172
173impl<Engine> Action<Engine> for PickNextBlockProducer
174where
175 Engine: EngineTypes,
176{
177 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
178 Box::pin(async move {
179 let num_clients = env.node_clients.len();
180 if num_clients == 0 {
181 return Err(eyre::eyre!("No node clients available"));
182 }
183
184 let latest_info = env
185 .latest_block_info
186 .as_ref()
187 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
188
189 let start_idx = ((latest_info.number + 1) % num_clients as u64) as usize;
191
192 for i in 0..num_clients {
193 let idx = (start_idx + i) % num_clients;
194 let node_client = &env.node_clients[idx];
195 let rpc_client = &node_client.rpc;
196
197 let latest_block =
198 EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
199 rpc_client,
200 alloy_eips::BlockNumberOrTag::Latest,
201 false,
202 )
203 .await?;
204
205 if let Some(block) = latest_block {
206 let block_number = block.header.number;
207 let block_hash = block.header.hash;
208
209 if block_hash == latest_info.hash && block_number == latest_info.number {
211 env.last_producer_idx = Some(idx);
212 debug!("Selected node {} as the next block producer", idx);
213 return Ok(());
214 }
215 }
216 }
217
218 Err(eyre::eyre!("No suitable block producer found"))
219 })
220 }
221}
222
223#[derive(Debug, Default)]
225pub struct GeneratePayloadAttributes {}
226
227impl<Engine> Action<Engine> for GeneratePayloadAttributes
228where
229 Engine: EngineTypes,
230{
231 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
232 Box::pin(async move {
233 let latest_block = env
234 .latest_block_info
235 .as_ref()
236 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
237 let block_number = latest_block.number;
238 let timestamp = env.latest_header_time + env.block_timestamp_increment;
239 let payload_attributes = alloy_rpc_types_engine::PayloadAttributes {
240 timestamp,
241 prev_randao: B256::random(),
242 suggested_fee_recipient: alloy_primitives::Address::random(),
243 withdrawals: Some(vec![]),
244 parent_beacon_block_root: Some(B256::ZERO),
245 };
246
247 env.payload_attributes.insert(latest_block.number + 1, payload_attributes);
248 debug!("Stored payload attributes for block {}", block_number + 1);
249 Ok(())
250 })
251 }
252}
253#[derive(Debug, Default)]
255pub struct GenerateNextPayload {}
256
257impl<Engine> Action<Engine> for GenerateNextPayload
258where
259 Engine: EngineTypes + PayloadTypes<PayloadAttributes = PayloadAttributes>,
260 reth_node_ethereum::engine::EthPayloadAttributes:
261 From<<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3>,
262{
263 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
264 Box::pin(async move {
265 let latest_block = env
266 .latest_block_info
267 .as_ref()
268 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
269
270 let parent_hash = latest_block.hash;
271 debug!("Latest block hash: {parent_hash}");
272
273 let fork_choice_state = ForkchoiceState {
274 head_block_hash: parent_hash,
275 safe_block_hash: parent_hash,
276 finalized_block_hash: parent_hash,
277 };
278
279 let payload_attributes: PayloadAttributes = env
280 .payload_attributes
281 .get(&latest_block.number)
282 .cloned()
283 .ok_or_else(|| eyre::eyre!("No payload attributes found for latest block"))?;
284
285 let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
286 &env.node_clients[0].engine,
287 fork_choice_state,
288 Some(payload_attributes.clone()),
289 )
290 .await?;
291
292 debug!("FCU result: {:?}", fcu_result);
293
294 let payload_id = fcu_result
295 .payload_id
296 .ok_or_else(|| eyre::eyre!("No payload ID returned from forkChoiceUpdated"))?;
297
298 debug!("Received payload ID: {:?}", payload_id);
299 env.next_payload_id = Some(payload_id);
300
301 sleep(Duration::from_secs(1)).await;
302
303 let built_payload: PayloadAttributes =
304 EngineApiClient::<Engine>::get_payload_v3(&env.node_clients[0].engine, payload_id)
305 .await?
306 .into();
307 env.payload_id_history.insert(latest_block.number + 1, payload_id);
308 env.latest_payload_built = Some(built_payload);
309
310 Ok(())
311 })
312 }
313}
314
315#[derive(Debug, Default)]
317pub struct BroadcastLatestForkchoice {}
318
319impl<Engine> Action<Engine> for BroadcastLatestForkchoice
320where
321 Engine: EngineTypes + PayloadTypes<PayloadAttributes = PayloadAttributes>,
322 reth_node_ethereum::engine::EthPayloadAttributes:
323 From<<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3>,
324{
325 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
326 Box::pin(async move {
327 let payload = env.latest_payload_executed.clone();
328
329 if env.node_clients.is_empty() {
330 return Err(eyre::eyre!("No node clients available"));
331 }
332 let latest_block = env
333 .latest_block_info
334 .as_ref()
335 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
336
337 let parent_hash = latest_block.hash;
338 debug!("Latest block hash: {parent_hash}");
339
340 let fork_choice_state = ForkchoiceState {
341 head_block_hash: parent_hash,
342 safe_block_hash: parent_hash,
343 finalized_block_hash: parent_hash,
344 };
345 debug!(
346 "Broadcasting forkchoice update to {} clients. Head: {:?}",
347 env.node_clients.len(),
348 fork_choice_state.head_block_hash
349 );
350
351 for (idx, client) in env.node_clients.iter().enumerate() {
352 let engine_client = &client.engine;
353
354 match EngineApiClient::<Engine>::fork_choice_updated_v3(
355 engine_client,
356 fork_choice_state,
357 payload.clone(),
358 )
359 .await
360 {
361 Ok(resp) => {
362 debug!(
363 "Client {}: Forkchoice update status: {:?}",
364 idx, resp.payload_status.status
365 );
366 }
367 Err(err) => {
368 return Err(eyre::eyre!(
369 "Client {}: Failed to broadcast forkchoice: {:?}",
370 idx,
371 err
372 ));
373 }
374 }
375 }
376 debug!("Forkchoice update broadcasted successfully");
377 Ok(())
378 })
379 }
380}
381
382#[derive(Debug)]
384pub struct ProduceBlocks<Engine> {
385 pub num_blocks: u64,
387 _phantom: PhantomData<Engine>,
389}
390
391impl<Engine> ProduceBlocks<Engine> {
392 pub fn new(num_blocks: u64) -> Self {
394 Self { num_blocks, _phantom: Default::default() }
395 }
396}
397
398impl<Engine> Default for ProduceBlocks<Engine> {
399 fn default() -> Self {
400 Self::new(0)
401 }
402}
403
404impl<Engine> Action<Engine> for ProduceBlocks<Engine>
405where
406 Engine: EngineTypes,
407{
408 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
409 Box::pin(async move {
410 let mut sequence = Sequence::new(vec![
412 Box::new(PickNextBlockProducer::default()),
413 Box::new(GeneratePayloadAttributes::default()),
414 ]);
415 for _ in 0..self.num_blocks {
416 sequence.execute(env).await?;
417 }
418 Ok(())
419 })
420 }
421}
422
423#[expect(missing_debug_implementations)]
425pub struct Sequence<I> {
426 pub actions: Vec<Box<dyn Action<I>>>,
428}
429
430impl<I> Sequence<I> {
431 pub fn new(actions: Vec<Box<dyn Action<I>>>) -> Self {
433 Self { actions }
434 }
435}
436
437impl<I: Sync + Send + 'static> Action<I> for Sequence<I> {
438 fn execute<'a>(&'a mut self, env: &'a mut Environment<I>) -> BoxFuture<'a, Result<()>> {
439 Box::pin(async move {
440 for action in &mut self.actions {
442 action.execute(env).await?;
443 }
444
445 Ok(())
446 })
447 }
448}
449
450#[derive(Debug, Default)]
452pub struct BroadcastNextNewPayload {}
453
454impl<Engine> Action<Engine> for BroadcastNextNewPayload
455where
456 Engine: EngineTypes + PayloadTypes<PayloadAttributes = PayloadAttributes>,
457 reth_node_ethereum::engine::EthPayloadAttributes:
458 From<<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3>,
459{
460 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
461 Box::pin(async move {
462 let next_new_payload = env
464 .latest_payload_built
465 .as_ref()
466 .ok_or_else(|| eyre::eyre!("No next built payload found"))?;
467 let parent_beacon_block_root = next_new_payload
468 .parent_beacon_block_root
469 .ok_or_else(|| eyre::eyre!("No parent beacon block root for next new payload"))?;
470
471 let mut successful_broadcast: bool = false;
473
474 for client in &env.node_clients {
475 let engine = &client.engine;
476 let rpc_client = &client.rpc;
477
478 let rpc_latest_block =
480 EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
481 rpc_client,
482 alloy_eips::BlockNumberOrTag::Latest,
483 false,
484 )
485 .await?
486 .ok_or_else(|| eyre::eyre!("No latest block found from rpc"))?;
487
488 let latest_block = reth_ethereum_primitives::Block {
489 header: rpc_latest_block.header.inner,
490 body: reth_ethereum_primitives::BlockBody {
491 transactions: rpc_latest_block
492 .transactions
493 .into_transactions()
494 .map(|tx| tx.inner.into_inner().into())
495 .collect(),
496 ommers: Default::default(),
497 withdrawals: rpc_latest_block.withdrawals,
498 },
499 };
500
501 let latest_block_info = env
503 .latest_block_info
504 .as_ref()
505 .ok_or_else(|| eyre::eyre!("No latest block info found"))?;
506
507 if latest_block.header.number != latest_block_info.number {
508 return Err(eyre::eyre!(
509 "Client block number {} does not match expected block number {}",
510 latest_block.header.number,
511 latest_block_info.number
512 ));
513 }
514
515 let latest_block_parent_beacon_block_root =
517 latest_block.parent_beacon_block_root.ok_or_else(|| {
518 eyre::eyre!("No parent beacon block root for latest block")
519 })?;
520
521 if parent_beacon_block_root != latest_block_parent_beacon_block_root {
522 return Err(eyre::eyre!(
523 "Parent beacon block root mismatch: expected {:?}, got {:?}",
524 parent_beacon_block_root,
525 latest_block_parent_beacon_block_root
526 ));
527 }
528
529 let execution_payload = ExecutionPayloadV3::from_block_slow(&latest_block);
532 let result = EngineApiClient::<Engine>::new_payload_v3(
533 engine,
534 execution_payload,
535 vec![],
536 parent_beacon_block_root,
537 )
538 .await?;
539
540 if result.status == PayloadStatusEnum::Valid {
542 successful_broadcast = true;
543 env.latest_payload_executed = Some(next_new_payload.clone());
546 break;
547 } else if let PayloadStatusEnum::Invalid { validation_error } = result.status {
548 debug!(
549 "Invalid payload status returned from broadcast: {:?}",
550 validation_error
551 );
552 }
553 }
554
555 if !successful_broadcast {
556 return Err(eyre::eyre!("Failed to successfully broadcast payload to any client"));
557 }
558
559 Ok(())
560 })
561 }
562}