1use crate::testsuite::{
4 actions::{expect_fcu_not_syncing_or_accepted, validate_fcu_response, Action, Sequence},
5 BlockInfo, Environment,
6};
7use alloy_primitives::{Bytes, B256};
8use alloy_rpc_types_engine::{
9 payload::ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum,
10};
11use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction, TransactionRequest};
12use eyre::Result;
13use futures_util::future::BoxFuture;
14use reth_ethereum_primitives::TransactionSigned;
15use reth_node_api::{EngineTypes, PayloadTypes};
16use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
17use std::{collections::HashSet, marker::PhantomData, time::Duration};
18use tokio::time::sleep;
19use tracing::debug;
20
21#[derive(Debug)]
24pub struct AssertMineBlock<Engine>
25where
26 Engine: PayloadTypes,
27{
28 pub node_idx: usize,
30 pub transactions: Vec<Bytes>,
32 pub expected_hash: Option<B256>,
34 pub payload_attributes: Engine::PayloadAttributes,
37 _phantom: PhantomData<Engine>,
39}
40
41impl<Engine> AssertMineBlock<Engine>
42where
43 Engine: PayloadTypes,
44{
45 pub fn new(
47 node_idx: usize,
48 transactions: Vec<Bytes>,
49 expected_hash: Option<B256>,
50 payload_attributes: Engine::PayloadAttributes,
51 ) -> Self {
52 Self {
53 node_idx,
54 transactions,
55 expected_hash,
56 payload_attributes,
57 _phantom: Default::default(),
58 }
59 }
60}
61
62impl<Engine> Action<Engine> for AssertMineBlock<Engine>
63where
64 Engine: EngineTypes,
65{
66 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
67 Box::pin(async move {
68 if self.node_idx >= env.node_clients.len() {
69 return Err(eyre::eyre!("Node index out of bounds: {}", self.node_idx));
70 }
71
72 let node_client = &env.node_clients[self.node_idx];
73 let rpc_client = &node_client.rpc;
74 let engine_client = node_client.engine.http_client();
75
76 let latest_block = EthApiClient::<
78 TransactionRequest,
79 Transaction,
80 Block,
81 Receipt,
82 Header,
83 TransactionSigned,
84 >::block_by_number(
85 rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
86 )
87 .await?;
88
89 let latest_block = latest_block.ok_or_else(|| eyre::eyre!("Latest block not found"))?;
90 let parent_hash = latest_block.header.hash;
91
92 debug!("Latest block hash: {parent_hash}");
93
94 let fork_choice_state = ForkchoiceState {
96 head_block_hash: parent_hash,
97 safe_block_hash: parent_hash,
98 finalized_block_hash: parent_hash,
99 };
100
101 match EngineApiClient::<Engine>::fork_choice_updated_v2(
103 &engine_client,
104 fork_choice_state,
105 Some(self.payload_attributes.clone()),
106 )
107 .await
108 {
109 Ok(fcu_result) => {
110 debug!(?fcu_result, "FCU v2 result");
111 match fcu_result.payload_status.status {
112 PayloadStatusEnum::Valid => {
113 if let Some(payload_id) = fcu_result.payload_id {
114 debug!(id=%payload_id, "Got payload");
115 let _engine_payload = EngineApiClient::<Engine>::get_payload_v2(
116 &engine_client,
117 payload_id,
118 )
119 .await?;
120 Ok(())
121 } else {
122 Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
123 }
124 }
125 _ => Err(eyre::eyre!(
126 "Payload status not valid: {:?}",
127 fcu_result.payload_status
128 ))?,
129 }
130 }
131 Err(_) => {
132 let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
134 &engine_client,
135 fork_choice_state,
136 Some(self.payload_attributes.clone()),
137 )
138 .await?;
139
140 debug!(?fcu_result, "FCU v3 result");
141 match fcu_result.payload_status.status {
142 PayloadStatusEnum::Valid => {
143 if let Some(payload_id) = fcu_result.payload_id {
144 debug!(id=%payload_id, "Got payload");
145 let _engine_payload = EngineApiClient::<Engine>::get_payload_v3(
146 &engine_client,
147 payload_id,
148 )
149 .await?;
150 Ok(())
151 } else {
152 Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
153 }
154 }
155 _ => Err(eyre::eyre!(
156 "Payload status not valid: {:?}",
157 fcu_result.payload_status
158 )),
159 }
160 }
161 }
162 })
163 }
164}
165
166#[derive(Debug, Default)]
168pub struct PickNextBlockProducer {}
169
170impl PickNextBlockProducer {
171 pub const fn new() -> Self {
173 Self {}
174 }
175}
176
177impl<Engine> Action<Engine> for PickNextBlockProducer
178where
179 Engine: EngineTypes,
180{
181 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
182 Box::pin(async move {
183 let num_clients = env.node_clients.len();
184 if num_clients == 0 {
185 return Err(eyre::eyre!("No node clients available"));
186 }
187
188 let latest_info = env
189 .current_block_info()
190 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
191
192 let next_producer_idx = ((latest_info.number + 1) % num_clients as u64) as usize;
194
195 env.last_producer_idx = Some(next_producer_idx);
196 debug!(
197 "Selected node {} as the next block producer for block {}",
198 next_producer_idx,
199 latest_info.number + 1
200 );
201
202 Ok(())
203 })
204 }
205}
206
207#[derive(Debug, Default)]
209pub struct GeneratePayloadAttributes {}
210
211impl<Engine> Action<Engine> for GeneratePayloadAttributes
212where
213 Engine: EngineTypes + PayloadTypes,
214 Engine::PayloadAttributes: From<PayloadAttributes>,
215{
216 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
217 Box::pin(async move {
218 let latest_block = env
219 .current_block_info()
220 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
221 let block_number = latest_block.number;
222 let timestamp =
223 env.active_node_state()?.latest_header_time + env.block_timestamp_increment;
224 let payload_attributes = PayloadAttributes {
225 timestamp,
226 prev_randao: B256::random(),
227 suggested_fee_recipient: alloy_primitives::Address::random(),
228 withdrawals: Some(vec![]),
229 parent_beacon_block_root: Some(B256::ZERO),
230 };
231
232 env.active_node_state_mut()?
233 .payload_attributes
234 .insert(latest_block.number + 1, payload_attributes);
235 debug!("Stored payload attributes for block {}", block_number + 1);
236 Ok(())
237 })
238 }
239}
240
241#[derive(Debug, Default)]
243pub struct GenerateNextPayload {}
244
245impl<Engine> Action<Engine> for GenerateNextPayload
246where
247 Engine: EngineTypes + PayloadTypes,
248 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
249{
250 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
251 Box::pin(async move {
252 let latest_block = env
253 .current_block_info()
254 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
255
256 let parent_hash = latest_block.hash;
257 debug!("Latest block hash: {parent_hash}");
258
259 let fork_choice_state = ForkchoiceState {
260 head_block_hash: parent_hash,
261 safe_block_hash: parent_hash,
262 finalized_block_hash: parent_hash,
263 };
264
265 let payload_attributes = env
266 .active_node_state()?
267 .payload_attributes
268 .get(&(latest_block.number + 1))
269 .cloned()
270 .ok_or_else(|| eyre::eyre!("No payload attributes found for next block"))?;
271
272 let producer_idx =
273 env.last_producer_idx.ok_or_else(|| eyre::eyre!("No block producer selected"))?;
274
275 let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
276 &env.node_clients[producer_idx].engine.http_client(),
277 fork_choice_state,
278 Some(payload_attributes.clone().into()),
279 )
280 .await?;
281
282 debug!("FCU result: {:?}", fcu_result);
283
284 expect_fcu_not_syncing_or_accepted(&fcu_result, "GenerateNextPayload")?;
288
289 let payload_id = if let Some(payload_id) = fcu_result.payload_id {
290 debug!("Received new payload ID: {:?}", payload_id);
291 payload_id
292 } else {
293 debug!("No payload ID returned, generating fresh payload attributes for forking");
294
295 let fresh_payload_attributes = PayloadAttributes {
296 timestamp: env.active_node_state()?.latest_header_time +
297 env.block_timestamp_increment,
298 prev_randao: B256::random(),
299 suggested_fee_recipient: alloy_primitives::Address::random(),
300 withdrawals: Some(vec![]),
301 parent_beacon_block_root: Some(B256::ZERO),
302 };
303
304 let fresh_fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
305 &env.node_clients[producer_idx].engine.http_client(),
306 fork_choice_state,
307 Some(fresh_payload_attributes.clone().into()),
308 )
309 .await?;
310
311 debug!("Fresh FCU result: {:?}", fresh_fcu_result);
312
313 expect_fcu_not_syncing_or_accepted(
315 &fresh_fcu_result,
316 "GenerateNextPayload (fresh)",
317 )?;
318
319 if let Some(payload_id) = fresh_fcu_result.payload_id {
320 payload_id
321 } else {
322 debug!("Engine considers the fork base already canonical, skipping payload generation");
323 return Ok(());
324 }
325 };
326
327 env.active_node_state_mut()?.next_payload_id = Some(payload_id);
328
329 sleep(Duration::from_secs(1)).await;
330
331 let built_payload_envelope = EngineApiClient::<Engine>::get_payload_v3(
332 &env.node_clients[producer_idx].engine.http_client(),
333 payload_id,
334 )
335 .await?;
336
337 let built_payload = payload_attributes.clone();
339 env.active_node_state_mut()?
340 .payload_id_history
341 .insert(latest_block.number + 1, payload_id);
342 env.active_node_state_mut()?.latest_payload_built = Some(built_payload);
343 env.active_node_state_mut()?.latest_payload_envelope = Some(built_payload_envelope);
344
345 Ok(())
346 })
347 }
348}
349
350#[derive(Debug, Default)]
352pub struct BroadcastLatestForkchoice {}
353
354impl<Engine> Action<Engine> for BroadcastLatestForkchoice
355where
356 Engine: EngineTypes + PayloadTypes,
357 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
358 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
359{
360 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
361 Box::pin(async move {
362 if env.node_clients.is_empty() {
363 return Err(eyre::eyre!("No node clients available"));
364 }
365
366 let head_hash = if let Some(payload_envelope) =
368 &env.active_node_state()?.latest_payload_envelope
369 {
370 let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
371 payload_envelope.clone().into();
372 let new_block_hash = execution_payload_envelope
373 .execution_payload
374 .payload_inner
375 .payload_inner
376 .block_hash;
377 debug!("Using newly executed block hash as head: {new_block_hash}");
378 new_block_hash
379 } else {
380 let rpc_client = &env.node_clients[0].rpc;
382 let current_head_block = EthApiClient::<
383 TransactionRequest,
384 Transaction,
385 Block,
386 Receipt,
387 Header,
388 TransactionSigned,
389 >::block_by_number(
390 rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
391 )
392 .await?
393 .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
394 debug!("Using RPC latest block hash as head: {}", current_head_block.header.hash);
395 current_head_block.header.hash
396 };
397
398 let fork_choice_state = ForkchoiceState {
399 head_block_hash: head_hash,
400 safe_block_hash: head_hash,
401 finalized_block_hash: head_hash,
402 };
403 debug!(
404 "Broadcasting forkchoice update to {} clients. Head: {:?}",
405 env.node_clients.len(),
406 fork_choice_state.head_block_hash
407 );
408
409 for (idx, client) in env.node_clients.iter().enumerate() {
410 match EngineApiClient::<Engine>::fork_choice_updated_v3(
411 &client.engine.http_client(),
412 fork_choice_state,
413 None,
414 )
415 .await
416 {
417 Ok(resp) => {
418 debug!(
419 "Client {}: Forkchoice update status: {:?}",
420 idx, resp.payload_status.status
421 );
422 validate_fcu_response(&resp, &format!("Client {idx}"))?;
424 }
425 Err(err) => {
426 return Err(eyre::eyre!(
427 "Client {}: Failed to broadcast forkchoice: {:?}",
428 idx,
429 err
430 ));
431 }
432 }
433 }
434 debug!("Forkchoice update broadcasted successfully");
435 Ok(())
436 })
437 }
438}
439
440#[derive(Debug, Default)]
446pub struct UpdateBlockInfo {}
447
448impl<Engine> Action<Engine> for UpdateBlockInfo
449where
450 Engine: EngineTypes,
451{
452 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
453 Box::pin(async move {
454 let rpc_client = &env.node_clients[0].rpc;
456 let latest_block = EthApiClient::<
457 TransactionRequest,
458 Transaction,
459 Block,
460 Receipt,
461 Header,
462 TransactionSigned,
463 >::block_by_number(
464 rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
465 )
466 .await?
467 .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
468
469 env.set_current_block_info(BlockInfo {
471 hash: latest_block.header.hash,
472 number: latest_block.header.number,
473 timestamp: latest_block.header.timestamp,
474 })?;
475
476 env.active_node_state_mut()?.latest_header_time = latest_block.header.timestamp;
477 env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
478 latest_block.header.hash;
479
480 debug!(
481 "Updated environment to block {} (hash: {})",
482 latest_block.header.number, latest_block.header.hash
483 );
484
485 Ok(())
486 })
487 }
488}
489
490#[derive(Debug, Default)]
496pub struct UpdateBlockInfoToLatestPayload {}
497
498impl<Engine> Action<Engine> for UpdateBlockInfoToLatestPayload
499where
500 Engine: EngineTypes + PayloadTypes,
501 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
502{
503 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
504 Box::pin(async move {
505 let payload_envelope = env
506 .active_node_state()?
507 .latest_payload_envelope
508 .as_ref()
509 .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?;
510
511 let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
512 payload_envelope.clone().into();
513 let execution_payload = execution_payload_envelope.execution_payload;
514
515 let block_hash = execution_payload.payload_inner.payload_inner.block_hash;
516 let block_number = execution_payload.payload_inner.payload_inner.block_number;
517 let block_timestamp = execution_payload.payload_inner.payload_inner.timestamp;
518
519 env.set_current_block_info(BlockInfo {
521 hash: block_hash,
522 number: block_number,
523 timestamp: block_timestamp,
524 })?;
525
526 env.active_node_state_mut()?.latest_header_time = block_timestamp;
527 env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash = block_hash;
528
529 debug!(
530 "Updated environment to newly produced block {} (hash: {})",
531 block_number, block_hash
532 );
533
534 Ok(())
535 })
536 }
537}
538
539#[derive(Debug, Default)]
541pub struct CheckPayloadAccepted {}
542
543impl<Engine> Action<Engine> for CheckPayloadAccepted
544where
545 Engine: EngineTypes,
546 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
547{
548 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
549 Box::pin(async move {
550 let mut accepted_check: bool = false;
551
552 let latest_block = env
553 .current_block_info()
554 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
555
556 let payload_id = *env
557 .active_node_state()?
558 .payload_id_history
559 .get(&(latest_block.number + 1))
560 .ok_or_else(|| eyre::eyre!("Cannot find payload_id"))?;
561
562 let node_clients = env.node_clients.clone();
563 for (idx, client) in node_clients.iter().enumerate() {
564 let rpc_client = &client.rpc;
565
566 let rpc_latest_header = EthApiClient::<
568 TransactionRequest,
569 Transaction,
570 Block,
571 Receipt,
572 Header,
573 TransactionSigned,
574 >::header_by_number(
575 rpc_client, alloy_eips::BlockNumberOrTag::Latest
576 )
577 .await?
578 .ok_or_else(|| eyre::eyre!("No latest header found from rpc"))?;
579
580 let next_new_payload = env
582 .active_node_state()?
583 .latest_payload_built
584 .as_ref()
585 .ok_or_else(|| eyre::eyre!("No next built payload found"))?;
586
587 let built_payload = EngineApiClient::<Engine>::get_payload_v3(
588 &client.engine.http_client(),
589 payload_id,
590 )
591 .await?;
592
593 let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = built_payload.into();
594 let new_payload_block_hash = execution_payload_envelope
595 .execution_payload
596 .payload_inner
597 .payload_inner
598 .block_hash;
599
600 if rpc_latest_header.hash != new_payload_block_hash {
601 debug!(
602 "Client {}: The hash is not matched: {:?} {:?}",
603 idx, rpc_latest_header.hash, new_payload_block_hash
604 );
605 continue;
606 }
607
608 if rpc_latest_header.inner.difficulty != alloy_primitives::U256::ZERO {
609 debug!(
610 "Client {}: difficulty != 0: {:?}",
611 idx, rpc_latest_header.inner.difficulty
612 );
613 continue;
614 }
615
616 if rpc_latest_header.inner.mix_hash != next_new_payload.prev_randao {
617 debug!(
618 "Client {}: The mix_hash and prev_randao is not same: {:?} {:?}",
619 idx, rpc_latest_header.inner.mix_hash, next_new_payload.prev_randao
620 );
621 continue;
622 }
623
624 let extra_len = rpc_latest_header.inner.extra_data.len();
625 if extra_len <= 32 {
626 debug!("Client {}: extra_len is fewer than 32. extra_len: {}", idx, extra_len);
627 continue;
628 }
629
630 if !accepted_check {
632 accepted_check = true;
633 env.set_current_block_info(BlockInfo {
635 hash: rpc_latest_header.hash,
636 number: rpc_latest_header.inner.number,
637 timestamp: rpc_latest_header.inner.timestamp,
638 })?;
639
640 env.active_node_state_mut()?.latest_header_time =
643 rpc_latest_header.inner.timestamp;
644 env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
645 rpc_latest_header.hash;
646 }
647 }
648
649 if accepted_check {
650 Ok(())
651 } else {
652 Err(eyre::eyre!("No clients passed payload acceptance checks"))
653 }
654 })
655 }
656}
657
658#[derive(Debug, Default)]
660pub struct BroadcastNextNewPayload {
661 active_node_only: bool,
663}
664
665impl BroadcastNextNewPayload {
666 pub const fn with_active_node() -> Self {
668 Self { active_node_only: true }
669 }
670}
671
672impl<Engine> Action<Engine> for BroadcastNextNewPayload
673where
674 Engine: EngineTypes + PayloadTypes,
675 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
676 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
677{
678 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
679 Box::pin(async move {
680 let next_new_payload = env
682 .active_node_state()?
683 .latest_payload_built
684 .as_ref()
685 .ok_or_else(|| eyre::eyre!("No next built payload found"))?
686 .clone();
687 let parent_beacon_block_root = next_new_payload
688 .parent_beacon_block_root
689 .ok_or_else(|| eyre::eyre!("No parent beacon block root for next new payload"))?;
690
691 let payload_envelope = env
692 .active_node_state()?
693 .latest_payload_envelope
694 .as_ref()
695 .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?
696 .clone();
697
698 let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = payload_envelope.into();
699 let execution_payload = execution_payload_envelope.execution_payload;
700
701 if self.active_node_only {
702 let active_idx = env.active_node_idx;
704 let engine = env.node_clients[active_idx].engine.http_client();
705
706 let result = EngineApiClient::<Engine>::new_payload_v3(
707 &engine,
708 execution_payload.clone(),
709 vec![],
710 parent_beacon_block_root,
711 )
712 .await?;
713
714 debug!("Active node {}: new_payload status: {:?}", active_idx, result.status);
715
716 match result.status {
718 PayloadStatusEnum::Valid => {
719 env.active_node_state_mut()?.latest_payload_executed =
720 Some(next_new_payload);
721 Ok(())
722 }
723 other => Err(eyre::eyre!(
724 "Active node {}: Unexpected payload status: {:?}",
725 active_idx,
726 other
727 )),
728 }
729 } else {
730 let mut broadcast_results = Vec::new();
732 let mut first_valid_seen = false;
733
734 for (idx, client) in env.node_clients.iter().enumerate() {
735 let engine = client.engine.http_client();
736
737 let result = EngineApiClient::<Engine>::new_payload_v3(
739 &engine,
740 execution_payload.clone(),
741 vec![],
742 parent_beacon_block_root,
743 )
744 .await?;
745
746 broadcast_results.push((idx, result.status.clone()));
747 debug!("Node {}: new_payload broadcast status: {:?}", idx, result.status);
748
749 if result.status == PayloadStatusEnum::Valid && !first_valid_seen {
751 first_valid_seen = true;
752 } else if let PayloadStatusEnum::Invalid { validation_error } = result.status {
753 debug!(
754 "Node {}: Invalid payload status returned from broadcast: {:?}",
755 idx, validation_error
756 );
757 }
758 }
759
760 if first_valid_seen {
762 env.active_node_state_mut()?.latest_payload_executed = Some(next_new_payload);
763 }
764
765 let any_valid =
767 broadcast_results.iter().any(|(_, status)| *status == PayloadStatusEnum::Valid);
768 if !any_valid {
769 return Err(eyre::eyre!(
770 "Failed to successfully broadcast payload to any client"
771 ));
772 }
773
774 debug!("Broadcast complete. Results: {:?}", broadcast_results);
775
776 Ok(())
777 }
778 })
779 }
780}
781
782#[derive(Debug)]
784pub struct ProduceBlocks<Engine> {
785 pub num_blocks: u64,
787 _phantom: PhantomData<Engine>,
789}
790
791impl<Engine> ProduceBlocks<Engine> {
792 pub fn new(num_blocks: u64) -> Self {
794 Self { num_blocks, _phantom: Default::default() }
795 }
796}
797
798impl<Engine> Default for ProduceBlocks<Engine> {
799 fn default() -> Self {
800 Self::new(0)
801 }
802}
803
804impl<Engine> Action<Engine> for ProduceBlocks<Engine>
805where
806 Engine: EngineTypes + PayloadTypes,
807 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
808 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
809{
810 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
811 Box::pin(async move {
812 for _ in 0..self.num_blocks {
813 let mut sequence = Sequence::new(vec![
817 Box::new(PickNextBlockProducer::default()),
818 Box::new(GeneratePayloadAttributes::default()),
819 Box::new(GenerateNextPayload::default()),
820 Box::new(BroadcastNextNewPayload::default()),
821 Box::new(UpdateBlockInfoToLatestPayload::default()),
822 ]);
823 sequence.execute(env).await?;
824 }
825 Ok(())
826 })
827 }
828}
829
830#[derive(Debug)]
832pub struct TestFcuToTag {
833 pub tag: String,
835 pub expected_status: PayloadStatusEnum,
837}
838
839impl TestFcuToTag {
840 pub fn new(tag: impl Into<String>, expected_status: PayloadStatusEnum) -> Self {
842 Self { tag: tag.into(), expected_status }
843 }
844}
845
846impl<Engine> Action<Engine> for TestFcuToTag
847where
848 Engine: EngineTypes,
849{
850 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
851 Box::pin(async move {
852 let (target_block, _node_idx) = env
854 .block_registry
855 .get(&self.tag)
856 .copied()
857 .ok_or_else(|| eyre::eyre!("Block tag '{}' not found in registry", self.tag))?;
858
859 let engine_client = env.node_clients[0].engine.http_client();
860 let fcu_state = ForkchoiceState {
861 head_block_hash: target_block.hash,
862 safe_block_hash: target_block.hash,
863 finalized_block_hash: target_block.hash,
864 };
865
866 let fcu_response =
867 EngineApiClient::<Engine>::fork_choice_updated_v2(&engine_client, fcu_state, None)
868 .await?;
869
870 match (&fcu_response.payload_status.status, &self.expected_status) {
872 (PayloadStatusEnum::Valid, PayloadStatusEnum::Valid) => {
873 debug!("FCU to '{}' returned VALID as expected", self.tag);
874 }
875 (PayloadStatusEnum::Invalid { .. }, PayloadStatusEnum::Invalid { .. }) => {
876 debug!("FCU to '{}' returned INVALID as expected", self.tag);
877 }
878 (PayloadStatusEnum::Syncing, PayloadStatusEnum::Syncing) => {
879 debug!("FCU to '{}' returned SYNCING as expected", self.tag);
880 }
881 (PayloadStatusEnum::Accepted, PayloadStatusEnum::Accepted) => {
882 debug!("FCU to '{}' returned ACCEPTED as expected", self.tag);
883 }
884 (actual, expected) => {
885 return Err(eyre::eyre!(
886 "FCU to '{}': expected status {:?}, but got {:?}",
887 self.tag,
888 expected,
889 actual
890 ));
891 }
892 }
893
894 Ok(())
895 })
896 }
897}
898
899#[derive(Debug)]
901pub struct ExpectFcuStatus {
902 pub target_tag: String,
904 pub expected_status: PayloadStatusEnum,
906}
907
908impl ExpectFcuStatus {
909 pub fn valid(target_tag: impl Into<String>) -> Self {
911 Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Valid }
912 }
913
914 pub fn invalid(target_tag: impl Into<String>) -> Self {
916 Self {
917 target_tag: target_tag.into(),
918 expected_status: PayloadStatusEnum::Invalid {
919 validation_error: "corrupted block".to_string(),
920 },
921 }
922 }
923
924 pub fn syncing(target_tag: impl Into<String>) -> Self {
926 Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Syncing }
927 }
928
929 pub fn accepted(target_tag: impl Into<String>) -> Self {
931 Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Accepted }
932 }
933}
934
935impl<Engine> Action<Engine> for ExpectFcuStatus
936where
937 Engine: EngineTypes,
938{
939 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
940 Box::pin(async move {
941 let mut test_fcu = TestFcuToTag::new(&self.target_tag, self.expected_status.clone());
942 test_fcu.execute(env).await
943 })
944 }
945}
946
947#[derive(Debug)]
949pub struct ValidateCanonicalTag {
950 pub tag: String,
952}
953
954impl ValidateCanonicalTag {
955 pub fn new(tag: impl Into<String>) -> Self {
957 Self { tag: tag.into() }
958 }
959}
960
961impl<Engine> Action<Engine> for ValidateCanonicalTag
962where
963 Engine: EngineTypes,
964{
965 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
966 Box::pin(async move {
967 let mut expect_valid = ExpectFcuStatus::valid(&self.tag);
968 expect_valid.execute(env).await?;
969
970 debug!("Successfully validated that '{}' remains canonical", self.tag);
971 Ok(())
972 })
973 }
974}
975
976#[derive(Debug)]
979pub struct ProduceBlocksLocally<Engine> {
980 pub num_blocks: u64,
982 _phantom: PhantomData<Engine>,
984}
985
986impl<Engine> ProduceBlocksLocally<Engine> {
987 pub fn new(num_blocks: u64) -> Self {
989 Self { num_blocks, _phantom: Default::default() }
990 }
991}
992
993impl<Engine> Default for ProduceBlocksLocally<Engine> {
994 fn default() -> Self {
995 Self::new(0)
996 }
997}
998
999impl<Engine> Action<Engine> for ProduceBlocksLocally<Engine>
1000where
1001 Engine: EngineTypes + PayloadTypes,
1002 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
1003 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
1004{
1005 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
1006 Box::pin(async move {
1007 let producer_idx = env.active_node_idx;
1009
1010 for _ in 0..self.num_blocks {
1011 env.last_producer_idx = Some(producer_idx);
1013
1014 let mut sequence = Sequence::new(vec![
1016 Box::new(GeneratePayloadAttributes::default()),
1018 Box::new(GenerateNextPayload::default()),
1019 Box::new(BroadcastNextNewPayload::with_active_node()),
1021 Box::new(UpdateBlockInfoToLatestPayload::default()),
1022 ]);
1023 sequence.execute(env).await?;
1024 }
1025 Ok(())
1026 })
1027 }
1028}
1029
1030#[derive(Debug)]
1032pub struct ProduceInvalidBlocks<Engine> {
1033 pub num_blocks: u64,
1035 pub invalid_indices: HashSet<u64>,
1037 _phantom: PhantomData<Engine>,
1039}
1040
1041impl<Engine> ProduceInvalidBlocks<Engine> {
1042 pub fn new(num_blocks: u64, invalid_indices: HashSet<u64>) -> Self {
1044 Self { num_blocks, invalid_indices, _phantom: Default::default() }
1045 }
1046
1047 pub fn with_invalid_at(num_blocks: u64, invalid_index: u64) -> Self {
1050 let mut invalid_indices = HashSet::new();
1051 invalid_indices.insert(invalid_index);
1052 Self::new(num_blocks, invalid_indices)
1053 }
1054}
1055
1056impl<Engine> Action<Engine> for ProduceInvalidBlocks<Engine>
1057where
1058 Engine: EngineTypes + PayloadTypes,
1059 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
1060 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
1061{
1062 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
1063 Box::pin(async move {
1064 for block_index in 0..self.num_blocks {
1065 let is_invalid = self.invalid_indices.contains(&block_index);
1066
1067 if is_invalid {
1068 debug!("Producing invalid block at index {}", block_index);
1069
1070 let mut sequence = Sequence::new(vec![
1072 Box::new(PickNextBlockProducer::default()),
1073 Box::new(GeneratePayloadAttributes::default()),
1074 Box::new(GenerateNextPayload::default()),
1075 ]);
1076 sequence.execute(env).await?;
1077
1078 let latest_envelope =
1080 env.active_node_state()?.latest_payload_envelope.as_ref().ok_or_else(
1081 || eyre::eyre!("No payload envelope available to corrupt"),
1082 )?;
1083
1084 let envelope_v3: ExecutionPayloadEnvelopeV3 = latest_envelope.clone().into();
1085 let mut corrupted_payload = envelope_v3.execution_payload;
1086
1087 corrupted_payload.payload_inner.payload_inner.state_root = B256::random();
1089
1090 debug!(
1091 "Corrupted state root for block {} to: {}",
1092 block_index, corrupted_payload.payload_inner.payload_inner.state_root
1093 );
1094
1095 let engine_client = env.node_clients[0].engine.http_client();
1097 let versioned_hashes = Vec::new();
1099 let parent_beacon_block_root = B256::random();
1101
1102 let new_payload_response = EngineApiClient::<Engine>::new_payload_v3(
1103 &engine_client,
1104 corrupted_payload.clone(),
1105 versioned_hashes,
1106 parent_beacon_block_root,
1107 )
1108 .await?;
1109
1110 match new_payload_response.status {
1112 PayloadStatusEnum::Invalid { validation_error } => {
1113 debug!(
1114 "Block {} correctly rejected as invalid: {:?}",
1115 block_index, validation_error
1116 );
1117 }
1118 other_status => {
1119 return Err(eyre::eyre!(
1120 "Expected block {} to be rejected as INVALID, but got: {:?}",
1121 block_index,
1122 other_status
1123 ));
1124 }
1125 }
1126
1127 env.set_current_block_info(BlockInfo {
1129 hash: corrupted_payload.payload_inner.payload_inner.block_hash,
1130 number: corrupted_payload.payload_inner.payload_inner.block_number,
1131 timestamp: corrupted_payload.timestamp(),
1132 })?;
1133 } else {
1134 debug!("Producing valid block at index {}", block_index);
1135
1136 let mut sequence = Sequence::new(vec![
1138 Box::new(PickNextBlockProducer::default()),
1139 Box::new(GeneratePayloadAttributes::default()),
1140 Box::new(GenerateNextPayload::default()),
1141 Box::new(BroadcastNextNewPayload::default()),
1142 Box::new(UpdateBlockInfoToLatestPayload::default()),
1143 ]);
1144 sequence.execute(env).await?;
1145 }
1146 }
1147 Ok(())
1148 })
1149 }
1150}