1use crate::testsuite::{
4 actions::{expect_fcu_not_syncing_or_accepted, validate_fcu_response, Action, Sequence},
5 BlockInfo, Environment,
6};
7use alloy_primitives::{Bytes, B256};
8use alloy_rpc_types_engine::{
9 payload::ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum,
10};
11use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction, TransactionRequest};
12use eyre::Result;
13use futures_util::future::BoxFuture;
14use reth_ethereum_primitives::TransactionSigned;
15use reth_node_api::{EngineTypes, PayloadTypes};
16use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
17use std::{collections::HashSet, marker::PhantomData, time::Duration};
18use tokio::time::sleep;
19use tracing::debug;
20
21#[derive(Debug)]
24pub struct AssertMineBlock<Engine>
25where
26 Engine: PayloadTypes,
27{
28 pub node_idx: usize,
30 pub transactions: Vec<Bytes>,
32 pub expected_hash: Option<B256>,
34 pub payload_attributes: Engine::PayloadAttributes,
37 _phantom: PhantomData<Engine>,
39}
40
41impl<Engine> AssertMineBlock<Engine>
42where
43 Engine: PayloadTypes,
44{
45 pub fn new(
47 node_idx: usize,
48 transactions: Vec<Bytes>,
49 expected_hash: Option<B256>,
50 payload_attributes: Engine::PayloadAttributes,
51 ) -> Self {
52 Self {
53 node_idx,
54 transactions,
55 expected_hash,
56 payload_attributes,
57 _phantom: Default::default(),
58 }
59 }
60}
61
62impl<Engine> Action<Engine> for AssertMineBlock<Engine>
63where
64 Engine: EngineTypes,
65{
66 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
67 Box::pin(async move {
68 if self.node_idx >= env.node_clients.len() {
69 return Err(eyre::eyre!("Node index out of bounds: {}", self.node_idx));
70 }
71
72 let node_client = &env.node_clients[self.node_idx];
73 let rpc_client = &node_client.rpc;
74 let engine_client = node_client.engine.http_client();
75
76 let latest_block = EthApiClient::<
78 TransactionRequest,
79 Transaction,
80 Block,
81 Receipt,
82 Header,
83 TransactionSigned,
84 >::block_by_number(
85 rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
86 )
87 .await?;
88
89 let latest_block = latest_block.ok_or_else(|| eyre::eyre!("Latest block not found"))?;
90 let parent_hash = latest_block.header.hash;
91
92 debug!("Latest block hash: {parent_hash}");
93
94 let fork_choice_state = ForkchoiceState {
96 head_block_hash: parent_hash,
97 safe_block_hash: parent_hash,
98 finalized_block_hash: parent_hash,
99 };
100
101 match EngineApiClient::<Engine>::fork_choice_updated_v2(
103 &engine_client,
104 fork_choice_state,
105 Some(self.payload_attributes.clone()),
106 )
107 .await
108 {
109 Ok(fcu_result) => {
110 debug!(?fcu_result, "FCU v2 result");
111 match fcu_result.payload_status.status {
112 PayloadStatusEnum::Valid => {
113 if let Some(payload_id) = fcu_result.payload_id {
114 debug!(id=%payload_id, "Got payload");
115 let _engine_payload = EngineApiClient::<Engine>::get_payload_v2(
116 &engine_client,
117 payload_id,
118 )
119 .await?;
120 Ok(())
121 } else {
122 Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
123 }
124 }
125 _ => Err(eyre::eyre!(
126 "Payload status not valid: {:?}",
127 fcu_result.payload_status
128 ))?,
129 }
130 }
131 Err(_) => {
132 let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
134 &engine_client,
135 fork_choice_state,
136 Some(self.payload_attributes.clone()),
137 )
138 .await?;
139
140 debug!(?fcu_result, "FCU v3 result");
141 match fcu_result.payload_status.status {
142 PayloadStatusEnum::Valid => {
143 if let Some(payload_id) = fcu_result.payload_id {
144 debug!(id=%payload_id, "Got payload");
145 let _engine_payload = EngineApiClient::<Engine>::get_payload_v3(
146 &engine_client,
147 payload_id,
148 )
149 .await?;
150 Ok(())
151 } else {
152 Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
153 }
154 }
155 _ => Err(eyre::eyre!(
156 "Payload status not valid: {:?}",
157 fcu_result.payload_status
158 )),
159 }
160 }
161 }
162 })
163 }
164}
165
166#[derive(Debug, Default)]
168pub struct PickNextBlockProducer {}
169
170impl PickNextBlockProducer {
171 pub const fn new() -> Self {
173 Self {}
174 }
175}
176
177impl<Engine> Action<Engine> for PickNextBlockProducer
178where
179 Engine: EngineTypes,
180{
181 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
182 Box::pin(async move {
183 let num_clients = env.node_clients.len();
184 if num_clients == 0 {
185 return Err(eyre::eyre!("No node clients available"));
186 }
187
188 let latest_info = env
189 .current_block_info()
190 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
191
192 let next_producer_idx = ((latest_info.number + 1) % num_clients as u64) as usize;
194
195 env.last_producer_idx = Some(next_producer_idx);
196 debug!(
197 "Selected node {} as the next block producer for block {}",
198 next_producer_idx,
199 latest_info.number + 1
200 );
201
202 Ok(())
203 })
204 }
205}
206
207#[derive(Debug, Default)]
209pub struct GeneratePayloadAttributes {}
210
211impl<Engine> Action<Engine> for GeneratePayloadAttributes
212where
213 Engine: EngineTypes + PayloadTypes,
214 Engine::PayloadAttributes: From<PayloadAttributes>,
215{
216 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
217 Box::pin(async move {
218 let latest_block = env
219 .current_block_info()
220 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
221 let block_number = latest_block.number;
222 let timestamp =
223 env.active_node_state()?.latest_header_time + env.block_timestamp_increment;
224 let payload_attributes = PayloadAttributes {
225 timestamp,
226 prev_randao: B256::random(),
227 suggested_fee_recipient: alloy_primitives::Address::random(),
228 withdrawals: Some(vec![]),
229 parent_beacon_block_root: Some(B256::ZERO),
230 slot_number: None,
231 target_gas_limit: None,
232 };
233
234 env.active_node_state_mut()?
235 .payload_attributes
236 .insert(latest_block.number + 1, payload_attributes);
237 debug!("Stored payload attributes for block {}", block_number + 1);
238 Ok(())
239 })
240 }
241}
242
243#[derive(Debug, Default)]
245pub struct GenerateNextPayload {}
246
247impl<Engine> Action<Engine> for GenerateNextPayload
248where
249 Engine: EngineTypes + PayloadTypes,
250 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
251{
252 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
253 Box::pin(async move {
254 let latest_block = env
255 .current_block_info()
256 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
257
258 let parent_hash = latest_block.hash;
259 debug!("Latest block hash: {parent_hash}");
260
261 let fork_choice_state = ForkchoiceState {
262 head_block_hash: parent_hash,
263 safe_block_hash: parent_hash,
264 finalized_block_hash: parent_hash,
265 };
266
267 let payload_attributes = env
268 .active_node_state()?
269 .payload_attributes
270 .get(&(latest_block.number + 1))
271 .cloned()
272 .ok_or_else(|| eyre::eyre!("No payload attributes found for next block"))?;
273
274 let producer_idx =
275 env.last_producer_idx.ok_or_else(|| eyre::eyre!("No block producer selected"))?;
276
277 let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
278 &env.node_clients[producer_idx].engine.http_client(),
279 fork_choice_state,
280 Some(payload_attributes.clone().into()),
281 )
282 .await?;
283
284 debug!("FCU result: {:?}", fcu_result);
285
286 expect_fcu_not_syncing_or_accepted(&fcu_result, "GenerateNextPayload")?;
290
291 let payload_id = if let Some(payload_id) = fcu_result.payload_id {
292 debug!("Received new payload ID: {:?}", payload_id);
293 payload_id
294 } else {
295 debug!("No payload ID returned, generating fresh payload attributes for forking");
296
297 let fresh_payload_attributes = PayloadAttributes {
298 timestamp: env.active_node_state()?.latest_header_time +
299 env.block_timestamp_increment,
300 prev_randao: B256::random(),
301 suggested_fee_recipient: alloy_primitives::Address::random(),
302 withdrawals: Some(vec![]),
303 parent_beacon_block_root: Some(B256::ZERO),
304 slot_number: None,
305 target_gas_limit: None,
306 };
307
308 let fresh_fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
309 &env.node_clients[producer_idx].engine.http_client(),
310 fork_choice_state,
311 Some(fresh_payload_attributes.clone().into()),
312 )
313 .await?;
314
315 debug!("Fresh FCU result: {:?}", fresh_fcu_result);
316
317 expect_fcu_not_syncing_or_accepted(
319 &fresh_fcu_result,
320 "GenerateNextPayload (fresh)",
321 )?;
322
323 if let Some(payload_id) = fresh_fcu_result.payload_id {
324 payload_id
325 } else {
326 debug!("Engine considers the fork base already canonical, skipping payload generation");
327 return Ok(());
328 }
329 };
330
331 env.active_node_state_mut()?.next_payload_id = Some(payload_id);
332
333 sleep(Duration::from_secs(1)).await;
334
335 let built_payload_envelope = EngineApiClient::<Engine>::get_payload_v3(
336 &env.node_clients[producer_idx].engine.http_client(),
337 payload_id,
338 )
339 .await?;
340
341 let built_payload = payload_attributes.clone();
343 env.active_node_state_mut()?
344 .payload_id_history
345 .insert(latest_block.number + 1, payload_id);
346 env.active_node_state_mut()?.latest_payload_built = Some(built_payload);
347 env.active_node_state_mut()?.latest_payload_envelope = Some(built_payload_envelope);
348
349 Ok(())
350 })
351 }
352}
353
354#[derive(Debug, Default)]
356pub struct BroadcastLatestForkchoice {}
357
358impl<Engine> Action<Engine> for BroadcastLatestForkchoice
359where
360 Engine: EngineTypes + PayloadTypes,
361 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
362 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
363{
364 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
365 Box::pin(async move {
366 if env.node_clients.is_empty() {
367 return Err(eyre::eyre!("No node clients available"));
368 }
369
370 let head_hash = if let Some(payload_envelope) =
372 &env.active_node_state()?.latest_payload_envelope
373 {
374 let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
375 payload_envelope.clone().into();
376 let new_block_hash = execution_payload_envelope
377 .execution_payload
378 .payload_inner
379 .payload_inner
380 .block_hash;
381 debug!("Using newly executed block hash as head: {new_block_hash}");
382 new_block_hash
383 } else {
384 let rpc_client = &env.node_clients[0].rpc;
386 let current_head_block = EthApiClient::<
387 TransactionRequest,
388 Transaction,
389 Block,
390 Receipt,
391 Header,
392 TransactionSigned,
393 >::block_by_number(
394 rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
395 )
396 .await?
397 .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
398 debug!("Using RPC latest block hash as head: {}", current_head_block.header.hash);
399 current_head_block.header.hash
400 };
401
402 let fork_choice_state = ForkchoiceState {
403 head_block_hash: head_hash,
404 safe_block_hash: head_hash,
405 finalized_block_hash: head_hash,
406 };
407 debug!(
408 "Broadcasting forkchoice update to {} clients. Head: {:?}",
409 env.node_clients.len(),
410 fork_choice_state.head_block_hash
411 );
412
413 for (idx, client) in env.node_clients.iter().enumerate() {
414 match EngineApiClient::<Engine>::fork_choice_updated_v3(
415 &client.engine.http_client(),
416 fork_choice_state,
417 None,
418 )
419 .await
420 {
421 Ok(resp) => {
422 debug!(
423 "Client {}: Forkchoice update status: {:?}",
424 idx, resp.payload_status.status
425 );
426 validate_fcu_response(&resp, &format!("Client {idx}"))?;
428 }
429 Err(err) => {
430 return Err(eyre::eyre!(
431 "Client {}: Failed to broadcast forkchoice: {:?}",
432 idx,
433 err
434 ));
435 }
436 }
437 }
438 debug!("Forkchoice update broadcasted successfully");
439 Ok(())
440 })
441 }
442}
443
444#[derive(Debug, Default)]
450pub struct UpdateBlockInfo {}
451
452impl<Engine> Action<Engine> for UpdateBlockInfo
453where
454 Engine: EngineTypes,
455{
456 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
457 Box::pin(async move {
458 let rpc_client = &env.node_clients[0].rpc;
460 let latest_block = EthApiClient::<
461 TransactionRequest,
462 Transaction,
463 Block,
464 Receipt,
465 Header,
466 TransactionSigned,
467 >::block_by_number(
468 rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
469 )
470 .await?
471 .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
472
473 env.set_current_block_info(BlockInfo {
475 hash: latest_block.header.hash,
476 number: latest_block.header.number,
477 timestamp: latest_block.header.timestamp,
478 })?;
479
480 env.active_node_state_mut()?.latest_header_time = latest_block.header.timestamp;
481 env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
482 latest_block.header.hash;
483
484 debug!(
485 "Updated environment to block {} (hash: {})",
486 latest_block.header.number, latest_block.header.hash
487 );
488
489 Ok(())
490 })
491 }
492}
493
494#[derive(Debug, Default)]
500pub struct UpdateBlockInfoToLatestPayload {}
501
502impl<Engine> Action<Engine> for UpdateBlockInfoToLatestPayload
503where
504 Engine: EngineTypes + PayloadTypes,
505 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
506{
507 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
508 Box::pin(async move {
509 let payload_envelope = env
510 .active_node_state()?
511 .latest_payload_envelope
512 .as_ref()
513 .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?;
514
515 let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
516 payload_envelope.clone().into();
517 let execution_payload = execution_payload_envelope.execution_payload;
518
519 let block_hash = execution_payload.payload_inner.payload_inner.block_hash;
520 let block_number = execution_payload.payload_inner.payload_inner.block_number;
521 let block_timestamp = execution_payload.payload_inner.payload_inner.timestamp;
522
523 env.set_current_block_info(BlockInfo {
525 hash: block_hash,
526 number: block_number,
527 timestamp: block_timestamp,
528 })?;
529
530 env.active_node_state_mut()?.latest_header_time = block_timestamp;
531 env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash = block_hash;
532
533 debug!(
534 "Updated environment to newly produced block {} (hash: {})",
535 block_number, block_hash
536 );
537
538 Ok(())
539 })
540 }
541}
542
543#[derive(Debug, Default)]
545pub struct CheckPayloadAccepted {}
546
547impl<Engine> Action<Engine> for CheckPayloadAccepted
548where
549 Engine: EngineTypes,
550 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
551{
552 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
553 Box::pin(async move {
554 let mut accepted_check: bool = false;
555
556 let latest_block = env
557 .current_block_info()
558 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
559
560 let payload_id = *env
561 .active_node_state()?
562 .payload_id_history
563 .get(&(latest_block.number + 1))
564 .ok_or_else(|| eyre::eyre!("Cannot find payload_id"))?;
565
566 let node_clients = env.node_clients.clone();
567 for (idx, client) in node_clients.iter().enumerate() {
568 let rpc_client = &client.rpc;
569
570 let rpc_latest_header = EthApiClient::<
572 TransactionRequest,
573 Transaction,
574 Block,
575 Receipt,
576 Header,
577 TransactionSigned,
578 >::header_by_number(
579 rpc_client, alloy_eips::BlockNumberOrTag::Latest
580 )
581 .await?
582 .ok_or_else(|| eyre::eyre!("No latest header found from rpc"))?;
583
584 let next_new_payload = env
586 .active_node_state()?
587 .latest_payload_built
588 .as_ref()
589 .ok_or_else(|| eyre::eyre!("No next built payload found"))?;
590
591 let built_payload = EngineApiClient::<Engine>::get_payload_v3(
592 &client.engine.http_client(),
593 payload_id,
594 )
595 .await?;
596
597 let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = built_payload.into();
598 let new_payload_block_hash = execution_payload_envelope
599 .execution_payload
600 .payload_inner
601 .payload_inner
602 .block_hash;
603
604 if rpc_latest_header.hash != new_payload_block_hash {
605 debug!(
606 "Client {}: The hash is not matched: {:?} {:?}",
607 idx, rpc_latest_header.hash, new_payload_block_hash
608 );
609 continue;
610 }
611
612 if rpc_latest_header.inner.difficulty != alloy_primitives::U256::ZERO {
613 debug!(
614 "Client {}: difficulty != 0: {:?}",
615 idx, rpc_latest_header.inner.difficulty
616 );
617 continue;
618 }
619
620 if rpc_latest_header.inner.mix_hash != next_new_payload.prev_randao {
621 debug!(
622 "Client {}: The mix_hash and prev_randao is not same: {:?} {:?}",
623 idx, rpc_latest_header.inner.mix_hash, next_new_payload.prev_randao
624 );
625 continue;
626 }
627
628 let extra_len = rpc_latest_header.inner.extra_data.len();
629 if extra_len <= 32 {
630 debug!("Client {}: extra_len is fewer than 32. extra_len: {}", idx, extra_len);
631 continue;
632 }
633
634 if !accepted_check {
636 accepted_check = true;
637 env.set_current_block_info(BlockInfo {
639 hash: rpc_latest_header.hash,
640 number: rpc_latest_header.inner.number,
641 timestamp: rpc_latest_header.inner.timestamp,
642 })?;
643
644 env.active_node_state_mut()?.latest_header_time =
647 rpc_latest_header.inner.timestamp;
648 env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
649 rpc_latest_header.hash;
650 }
651 }
652
653 if accepted_check {
654 Ok(())
655 } else {
656 Err(eyre::eyre!("No clients passed payload acceptance checks"))
657 }
658 })
659 }
660}
661
662#[derive(Debug, Default)]
664pub struct BroadcastNextNewPayload {
665 active_node_only: bool,
667}
668
669impl BroadcastNextNewPayload {
670 pub const fn with_active_node() -> Self {
672 Self { active_node_only: true }
673 }
674}
675
676impl<Engine> Action<Engine> for BroadcastNextNewPayload
677where
678 Engine: EngineTypes + PayloadTypes,
679 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
680 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
681{
682 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
683 Box::pin(async move {
684 let next_new_payload = env
686 .active_node_state()?
687 .latest_payload_built
688 .as_ref()
689 .ok_or_else(|| eyre::eyre!("No next built payload found"))?
690 .clone();
691 let parent_beacon_block_root = next_new_payload
692 .parent_beacon_block_root
693 .ok_or_else(|| eyre::eyre!("No parent beacon block root for next new payload"))?;
694
695 let payload_envelope = env
696 .active_node_state()?
697 .latest_payload_envelope
698 .as_ref()
699 .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?
700 .clone();
701
702 let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = payload_envelope.into();
703 let execution_payload = execution_payload_envelope.execution_payload;
704
705 if self.active_node_only {
706 let active_idx = env.active_node_idx;
708 let engine = env.node_clients[active_idx].engine.http_client();
709
710 let result = EngineApiClient::<Engine>::new_payload_v3(
711 &engine,
712 execution_payload.clone(),
713 vec![],
714 parent_beacon_block_root,
715 )
716 .await?;
717
718 debug!("Active node {}: new_payload status: {:?}", active_idx, result.status);
719
720 match result.status {
722 PayloadStatusEnum::Valid => {
723 env.active_node_state_mut()?.latest_payload_executed =
724 Some(next_new_payload);
725 Ok(())
726 }
727 other => Err(eyre::eyre!(
728 "Active node {}: Unexpected payload status: {:?}",
729 active_idx,
730 other
731 )),
732 }
733 } else {
734 let mut broadcast_results = Vec::new();
736 let mut first_valid_seen = false;
737
738 for (idx, client) in env.node_clients.iter().enumerate() {
739 let engine = client.engine.http_client();
740
741 let result = EngineApiClient::<Engine>::new_payload_v3(
743 &engine,
744 execution_payload.clone(),
745 vec![],
746 parent_beacon_block_root,
747 )
748 .await?;
749
750 broadcast_results.push((idx, result.status.clone()));
751 debug!("Node {}: new_payload broadcast status: {:?}", idx, result.status);
752
753 if result.status == PayloadStatusEnum::Valid && !first_valid_seen {
755 first_valid_seen = true;
756 } else if let PayloadStatusEnum::Invalid { validation_error } = result.status {
757 debug!(
758 "Node {}: Invalid payload status returned from broadcast: {:?}",
759 idx, validation_error
760 );
761 }
762 }
763
764 if first_valid_seen {
766 env.active_node_state_mut()?.latest_payload_executed = Some(next_new_payload);
767 }
768
769 let any_valid =
771 broadcast_results.iter().any(|(_, status)| *status == PayloadStatusEnum::Valid);
772 if !any_valid {
773 return Err(eyre::eyre!(
774 "Failed to successfully broadcast payload to any client"
775 ));
776 }
777
778 debug!("Broadcast complete. Results: {:?}", broadcast_results);
779
780 Ok(())
781 }
782 })
783 }
784}
785
786#[derive(Debug)]
788pub struct ProduceBlocks<Engine> {
789 pub num_blocks: u64,
791 _phantom: PhantomData<Engine>,
793}
794
795impl<Engine> ProduceBlocks<Engine> {
796 pub fn new(num_blocks: u64) -> Self {
798 Self { num_blocks, _phantom: Default::default() }
799 }
800}
801
802impl<Engine> Default for ProduceBlocks<Engine> {
803 fn default() -> Self {
804 Self::new(0)
805 }
806}
807
808impl<Engine> Action<Engine> for ProduceBlocks<Engine>
809where
810 Engine: EngineTypes + PayloadTypes,
811 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
812 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
813{
814 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
815 Box::pin(async move {
816 for _ in 0..self.num_blocks {
817 let mut sequence = Sequence::new(vec![
821 Box::new(PickNextBlockProducer::default()),
822 Box::new(GeneratePayloadAttributes::default()),
823 Box::new(GenerateNextPayload::default()),
824 Box::new(BroadcastNextNewPayload::default()),
825 Box::new(UpdateBlockInfoToLatestPayload::default()),
826 ]);
827 sequence.execute(env).await?;
828 }
829 Ok(())
830 })
831 }
832}
833
834#[derive(Debug)]
836pub struct TestFcuToTag {
837 pub tag: String,
839 pub expected_status: PayloadStatusEnum,
841}
842
843impl TestFcuToTag {
844 pub fn new(tag: impl Into<String>, expected_status: PayloadStatusEnum) -> Self {
846 Self { tag: tag.into(), expected_status }
847 }
848}
849
850impl<Engine> Action<Engine> for TestFcuToTag
851where
852 Engine: EngineTypes,
853{
854 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
855 Box::pin(async move {
856 let (target_block, _node_idx) = env
858 .block_registry
859 .get(&self.tag)
860 .copied()
861 .ok_or_else(|| eyre::eyre!("Block tag '{}' not found in registry", self.tag))?;
862
863 let engine_client = env.node_clients[0].engine.http_client();
864 let fcu_state = ForkchoiceState {
865 head_block_hash: target_block.hash,
866 safe_block_hash: target_block.hash,
867 finalized_block_hash: target_block.hash,
868 };
869
870 let fcu_response =
871 EngineApiClient::<Engine>::fork_choice_updated_v2(&engine_client, fcu_state, None)
872 .await?;
873
874 match (&fcu_response.payload_status.status, &self.expected_status) {
876 (PayloadStatusEnum::Valid, PayloadStatusEnum::Valid) => {
877 debug!("FCU to '{}' returned VALID as expected", self.tag);
878 }
879 (PayloadStatusEnum::Invalid { .. }, PayloadStatusEnum::Invalid { .. }) => {
880 debug!("FCU to '{}' returned INVALID as expected", self.tag);
881 }
882 (PayloadStatusEnum::Syncing, PayloadStatusEnum::Syncing) => {
883 debug!("FCU to '{}' returned SYNCING as expected", self.tag);
884 }
885 (PayloadStatusEnum::Accepted, PayloadStatusEnum::Accepted) => {
886 debug!("FCU to '{}' returned ACCEPTED as expected", self.tag);
887 }
888 (actual, expected) => {
889 return Err(eyre::eyre!(
890 "FCU to '{}': expected status {:?}, but got {:?}",
891 self.tag,
892 expected,
893 actual
894 ));
895 }
896 }
897
898 Ok(())
899 })
900 }
901}
902
903#[derive(Debug)]
905pub struct ExpectFcuStatus {
906 pub target_tag: String,
908 pub expected_status: PayloadStatusEnum,
910}
911
912impl ExpectFcuStatus {
913 pub fn valid(target_tag: impl Into<String>) -> Self {
915 Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Valid }
916 }
917
918 pub fn invalid(target_tag: impl Into<String>) -> Self {
920 Self {
921 target_tag: target_tag.into(),
922 expected_status: PayloadStatusEnum::Invalid {
923 validation_error: "corrupted block".to_string(),
924 },
925 }
926 }
927
928 pub fn syncing(target_tag: impl Into<String>) -> Self {
930 Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Syncing }
931 }
932
933 pub fn accepted(target_tag: impl Into<String>) -> Self {
935 Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Accepted }
936 }
937}
938
939impl<Engine> Action<Engine> for ExpectFcuStatus
940where
941 Engine: EngineTypes,
942{
943 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
944 Box::pin(async move {
945 let mut test_fcu = TestFcuToTag::new(&self.target_tag, self.expected_status.clone());
946 test_fcu.execute(env).await
947 })
948 }
949}
950
951#[derive(Debug)]
953pub struct ValidateCanonicalTag {
954 pub tag: String,
956}
957
958impl ValidateCanonicalTag {
959 pub fn new(tag: impl Into<String>) -> Self {
961 Self { tag: tag.into() }
962 }
963}
964
965impl<Engine> Action<Engine> for ValidateCanonicalTag
966where
967 Engine: EngineTypes,
968{
969 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
970 Box::pin(async move {
971 let mut expect_valid = ExpectFcuStatus::valid(&self.tag);
972 expect_valid.execute(env).await?;
973
974 debug!("Successfully validated that '{}' remains canonical", self.tag);
975 Ok(())
976 })
977 }
978}
979
980#[derive(Debug)]
983pub struct ProduceBlocksLocally<Engine> {
984 pub num_blocks: u64,
986 _phantom: PhantomData<Engine>,
988}
989
990impl<Engine> ProduceBlocksLocally<Engine> {
991 pub fn new(num_blocks: u64) -> Self {
993 Self { num_blocks, _phantom: Default::default() }
994 }
995}
996
997impl<Engine> Default for ProduceBlocksLocally<Engine> {
998 fn default() -> Self {
999 Self::new(0)
1000 }
1001}
1002
1003impl<Engine> Action<Engine> for ProduceBlocksLocally<Engine>
1004where
1005 Engine: EngineTypes + PayloadTypes,
1006 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
1007 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
1008{
1009 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
1010 Box::pin(async move {
1011 let producer_idx = env.active_node_idx;
1013
1014 for _ in 0..self.num_blocks {
1015 env.last_producer_idx = Some(producer_idx);
1017
1018 let mut sequence = Sequence::new(vec![
1020 Box::new(GeneratePayloadAttributes::default()),
1022 Box::new(GenerateNextPayload::default()),
1023 Box::new(BroadcastNextNewPayload::with_active_node()),
1025 Box::new(UpdateBlockInfoToLatestPayload::default()),
1026 ]);
1027 sequence.execute(env).await?;
1028 }
1029 Ok(())
1030 })
1031 }
1032}
1033
1034#[derive(Debug)]
1036pub struct ProduceInvalidBlocks<Engine> {
1037 pub num_blocks: u64,
1039 pub invalid_indices: HashSet<u64>,
1041 _phantom: PhantomData<Engine>,
1043}
1044
1045impl<Engine> ProduceInvalidBlocks<Engine> {
1046 pub fn new(num_blocks: u64, invalid_indices: HashSet<u64>) -> Self {
1048 Self { num_blocks, invalid_indices, _phantom: Default::default() }
1049 }
1050
1051 pub fn with_invalid_at(num_blocks: u64, invalid_index: u64) -> Self {
1054 let mut invalid_indices = HashSet::new();
1055 invalid_indices.insert(invalid_index);
1056 Self::new(num_blocks, invalid_indices)
1057 }
1058}
1059
1060impl<Engine> Action<Engine> for ProduceInvalidBlocks<Engine>
1061where
1062 Engine: EngineTypes + PayloadTypes,
1063 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
1064 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
1065{
1066 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
1067 Box::pin(async move {
1068 for block_index in 0..self.num_blocks {
1069 let is_invalid = self.invalid_indices.contains(&block_index);
1070
1071 if is_invalid {
1072 debug!("Producing invalid block at index {}", block_index);
1073
1074 let mut sequence = Sequence::new(vec![
1076 Box::new(PickNextBlockProducer::default()),
1077 Box::new(GeneratePayloadAttributes::default()),
1078 Box::new(GenerateNextPayload::default()),
1079 ]);
1080 sequence.execute(env).await?;
1081
1082 let latest_envelope =
1084 env.active_node_state()?.latest_payload_envelope.as_ref().ok_or_else(
1085 || eyre::eyre!("No payload envelope available to corrupt"),
1086 )?;
1087
1088 let envelope_v3: ExecutionPayloadEnvelopeV3 = latest_envelope.clone().into();
1089 let mut corrupted_payload = envelope_v3.execution_payload;
1090
1091 corrupted_payload.payload_inner.payload_inner.state_root = B256::random();
1093
1094 debug!(
1095 "Corrupted state root for block {} to: {}",
1096 block_index, corrupted_payload.payload_inner.payload_inner.state_root
1097 );
1098
1099 let engine_client = env.node_clients[0].engine.http_client();
1101 let versioned_hashes = Vec::new();
1103 let parent_beacon_block_root = B256::random();
1105
1106 let new_payload_response = EngineApiClient::<Engine>::new_payload_v3(
1107 &engine_client,
1108 corrupted_payload.clone(),
1109 versioned_hashes,
1110 parent_beacon_block_root,
1111 )
1112 .await?;
1113
1114 match new_payload_response.status {
1116 PayloadStatusEnum::Invalid { validation_error } => {
1117 debug!(
1118 "Block {} correctly rejected as invalid: {:?}",
1119 block_index, validation_error
1120 );
1121 }
1122 other_status => {
1123 return Err(eyre::eyre!(
1124 "Expected block {} to be rejected as INVALID, but got: {:?}",
1125 block_index,
1126 other_status
1127 ));
1128 }
1129 }
1130
1131 env.set_current_block_info(BlockInfo {
1133 hash: corrupted_payload.payload_inner.payload_inner.block_hash,
1134 number: corrupted_payload.payload_inner.payload_inner.block_number,
1135 timestamp: corrupted_payload.timestamp(),
1136 })?;
1137 } else {
1138 debug!("Producing valid block at index {}", block_index);
1139
1140 let mut sequence = Sequence::new(vec![
1142 Box::new(PickNextBlockProducer::default()),
1143 Box::new(GeneratePayloadAttributes::default()),
1144 Box::new(GenerateNextPayload::default()),
1145 Box::new(BroadcastNextNewPayload::default()),
1146 Box::new(UpdateBlockInfoToLatestPayload::default()),
1147 ]);
1148 sequence.execute(env).await?;
1149 }
1150 }
1151 Ok(())
1152 })
1153 }
1154}