1use crate::testsuite::{
4 actions::{expect_fcu_not_syncing_or_accepted, validate_fcu_response, Action, Sequence},
5 BlockInfo, Environment,
6};
7use alloy_primitives::{Bytes, B256};
8use alloy_rpc_types_engine::{
9 payload::ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum,
10};
11use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction, TransactionRequest};
12use eyre::Result;
13use futures_util::future::BoxFuture;
14use reth_ethereum_primitives::TransactionSigned;
15use reth_node_api::{EngineTypes, PayloadTypes};
16use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
17use std::{collections::HashSet, marker::PhantomData, time::Duration};
18use tokio::time::sleep;
19use tracing::debug;
20
21#[derive(Debug)]
24pub struct AssertMineBlock<Engine>
25where
26 Engine: PayloadTypes,
27{
28 pub node_idx: usize,
30 pub transactions: Vec<Bytes>,
32 pub expected_hash: Option<B256>,
34 pub payload_attributes: Engine::PayloadAttributes,
37 _phantom: PhantomData<Engine>,
39}
40
41impl<Engine> AssertMineBlock<Engine>
42where
43 Engine: PayloadTypes,
44{
45 pub fn new(
47 node_idx: usize,
48 transactions: Vec<Bytes>,
49 expected_hash: Option<B256>,
50 payload_attributes: Engine::PayloadAttributes,
51 ) -> Self {
52 Self {
53 node_idx,
54 transactions,
55 expected_hash,
56 payload_attributes,
57 _phantom: Default::default(),
58 }
59 }
60}
61
62impl<Engine> Action<Engine> for AssertMineBlock<Engine>
63where
64 Engine: EngineTypes,
65{
66 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
67 Box::pin(async move {
68 if self.node_idx >= env.node_clients.len() {
69 return Err(eyre::eyre!("Node index out of bounds: {}", self.node_idx));
70 }
71
72 let node_client = &env.node_clients[self.node_idx];
73 let rpc_client = &node_client.rpc;
74 let engine_client = node_client.engine.http_client();
75
76 let latest_block = EthApiClient::<
78 TransactionRequest,
79 Transaction,
80 Block,
81 Receipt,
82 Header,
83 TransactionSigned,
84 >::block_by_number(
85 rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
86 )
87 .await?;
88
89 let latest_block = latest_block.ok_or_else(|| eyre::eyre!("Latest block not found"))?;
90 let parent_hash = latest_block.header.hash;
91
92 debug!("Latest block hash: {parent_hash}");
93
94 let fork_choice_state = ForkchoiceState {
96 head_block_hash: parent_hash,
97 safe_block_hash: parent_hash,
98 finalized_block_hash: parent_hash,
99 };
100
101 match EngineApiClient::<Engine>::fork_choice_updated_v2(
103 &engine_client,
104 fork_choice_state,
105 Some(self.payload_attributes.clone()),
106 )
107 .await
108 {
109 Ok(fcu_result) => {
110 debug!(?fcu_result, "FCU v2 result");
111 match fcu_result.payload_status.status {
112 PayloadStatusEnum::Valid => {
113 if let Some(payload_id) = fcu_result.payload_id {
114 debug!(id=%payload_id, "Got payload");
115 let _engine_payload = EngineApiClient::<Engine>::get_payload_v2(
116 &engine_client,
117 payload_id,
118 )
119 .await?;
120 Ok(())
121 } else {
122 Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
123 }
124 }
125 _ => Err(eyre::eyre!(
126 "Payload status not valid: {:?}",
127 fcu_result.payload_status
128 ))?,
129 }
130 }
131 Err(_) => {
132 let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
134 &engine_client,
135 fork_choice_state,
136 Some(self.payload_attributes.clone()),
137 )
138 .await?;
139
140 debug!(?fcu_result, "FCU v3 result");
141 match fcu_result.payload_status.status {
142 PayloadStatusEnum::Valid => {
143 if let Some(payload_id) = fcu_result.payload_id {
144 debug!(id=%payload_id, "Got payload");
145 let _engine_payload = EngineApiClient::<Engine>::get_payload_v3(
146 &engine_client,
147 payload_id,
148 )
149 .await?;
150 Ok(())
151 } else {
152 Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
153 }
154 }
155 _ => Err(eyre::eyre!(
156 "Payload status not valid: {:?}",
157 fcu_result.payload_status
158 )),
159 }
160 }
161 }
162 })
163 }
164}
165
166#[derive(Debug, Default)]
168pub struct PickNextBlockProducer {}
169
170impl PickNextBlockProducer {
171 pub const fn new() -> Self {
173 Self {}
174 }
175}
176
177impl<Engine> Action<Engine> for PickNextBlockProducer
178where
179 Engine: EngineTypes,
180{
181 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
182 Box::pin(async move {
183 let num_clients = env.node_clients.len();
184 if num_clients == 0 {
185 return Err(eyre::eyre!("No node clients available"));
186 }
187
188 let latest_info = env
189 .current_block_info()
190 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
191
192 let next_producer_idx = ((latest_info.number + 1) % num_clients as u64) as usize;
194
195 env.last_producer_idx = Some(next_producer_idx);
196 debug!(
197 "Selected node {} as the next block producer for block {}",
198 next_producer_idx,
199 latest_info.number + 1
200 );
201
202 Ok(())
203 })
204 }
205}
206
207#[derive(Debug, Default)]
209pub struct GeneratePayloadAttributes {}
210
211impl<Engine> Action<Engine> for GeneratePayloadAttributes
212where
213 Engine: EngineTypes + PayloadTypes,
214 Engine::PayloadAttributes: From<PayloadAttributes>,
215{
216 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
217 Box::pin(async move {
218 let latest_block = env
219 .current_block_info()
220 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
221 let block_number = latest_block.number;
222 let timestamp =
223 env.active_node_state()?.latest_header_time + env.block_timestamp_increment;
224 let payload_attributes = PayloadAttributes {
225 timestamp,
226 prev_randao: B256::random(),
227 suggested_fee_recipient: alloy_primitives::Address::random(),
228 withdrawals: Some(vec![]),
229 parent_beacon_block_root: Some(B256::ZERO),
230 slot_number: None,
231 };
232
233 env.active_node_state_mut()?
234 .payload_attributes
235 .insert(latest_block.number + 1, payload_attributes);
236 debug!("Stored payload attributes for block {}", block_number + 1);
237 Ok(())
238 })
239 }
240}
241
242#[derive(Debug, Default)]
244pub struct GenerateNextPayload {}
245
246impl<Engine> Action<Engine> for GenerateNextPayload
247where
248 Engine: EngineTypes + PayloadTypes,
249 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
250{
251 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
252 Box::pin(async move {
253 let latest_block = env
254 .current_block_info()
255 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
256
257 let parent_hash = latest_block.hash;
258 debug!("Latest block hash: {parent_hash}");
259
260 let fork_choice_state = ForkchoiceState {
261 head_block_hash: parent_hash,
262 safe_block_hash: parent_hash,
263 finalized_block_hash: parent_hash,
264 };
265
266 let payload_attributes = env
267 .active_node_state()?
268 .payload_attributes
269 .get(&(latest_block.number + 1))
270 .cloned()
271 .ok_or_else(|| eyre::eyre!("No payload attributes found for next block"))?;
272
273 let producer_idx =
274 env.last_producer_idx.ok_or_else(|| eyre::eyre!("No block producer selected"))?;
275
276 let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
277 &env.node_clients[producer_idx].engine.http_client(),
278 fork_choice_state,
279 Some(payload_attributes.clone().into()),
280 )
281 .await?;
282
283 debug!("FCU result: {:?}", fcu_result);
284
285 expect_fcu_not_syncing_or_accepted(&fcu_result, "GenerateNextPayload")?;
289
290 let payload_id = if let Some(payload_id) = fcu_result.payload_id {
291 debug!("Received new payload ID: {:?}", payload_id);
292 payload_id
293 } else {
294 debug!("No payload ID returned, generating fresh payload attributes for forking");
295
296 let fresh_payload_attributes = PayloadAttributes {
297 timestamp: env.active_node_state()?.latest_header_time +
298 env.block_timestamp_increment,
299 prev_randao: B256::random(),
300 suggested_fee_recipient: alloy_primitives::Address::random(),
301 withdrawals: Some(vec![]),
302 parent_beacon_block_root: Some(B256::ZERO),
303 slot_number: None,
304 };
305
306 let fresh_fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
307 &env.node_clients[producer_idx].engine.http_client(),
308 fork_choice_state,
309 Some(fresh_payload_attributes.clone().into()),
310 )
311 .await?;
312
313 debug!("Fresh FCU result: {:?}", fresh_fcu_result);
314
315 expect_fcu_not_syncing_or_accepted(
317 &fresh_fcu_result,
318 "GenerateNextPayload (fresh)",
319 )?;
320
321 if let Some(payload_id) = fresh_fcu_result.payload_id {
322 payload_id
323 } else {
324 debug!("Engine considers the fork base already canonical, skipping payload generation");
325 return Ok(());
326 }
327 };
328
329 env.active_node_state_mut()?.next_payload_id = Some(payload_id);
330
331 sleep(Duration::from_secs(1)).await;
332
333 let built_payload_envelope = EngineApiClient::<Engine>::get_payload_v3(
334 &env.node_clients[producer_idx].engine.http_client(),
335 payload_id,
336 )
337 .await?;
338
339 let built_payload = payload_attributes.clone();
341 env.active_node_state_mut()?
342 .payload_id_history
343 .insert(latest_block.number + 1, payload_id);
344 env.active_node_state_mut()?.latest_payload_built = Some(built_payload);
345 env.active_node_state_mut()?.latest_payload_envelope = Some(built_payload_envelope);
346
347 Ok(())
348 })
349 }
350}
351
352#[derive(Debug, Default)]
354pub struct BroadcastLatestForkchoice {}
355
356impl<Engine> Action<Engine> for BroadcastLatestForkchoice
357where
358 Engine: EngineTypes + PayloadTypes,
359 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
360 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
361{
362 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
363 Box::pin(async move {
364 if env.node_clients.is_empty() {
365 return Err(eyre::eyre!("No node clients available"));
366 }
367
368 let head_hash = if let Some(payload_envelope) =
370 &env.active_node_state()?.latest_payload_envelope
371 {
372 let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
373 payload_envelope.clone().into();
374 let new_block_hash = execution_payload_envelope
375 .execution_payload
376 .payload_inner
377 .payload_inner
378 .block_hash;
379 debug!("Using newly executed block hash as head: {new_block_hash}");
380 new_block_hash
381 } else {
382 let rpc_client = &env.node_clients[0].rpc;
384 let current_head_block = EthApiClient::<
385 TransactionRequest,
386 Transaction,
387 Block,
388 Receipt,
389 Header,
390 TransactionSigned,
391 >::block_by_number(
392 rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
393 )
394 .await?
395 .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
396 debug!("Using RPC latest block hash as head: {}", current_head_block.header.hash);
397 current_head_block.header.hash
398 };
399
400 let fork_choice_state = ForkchoiceState {
401 head_block_hash: head_hash,
402 safe_block_hash: head_hash,
403 finalized_block_hash: head_hash,
404 };
405 debug!(
406 "Broadcasting forkchoice update to {} clients. Head: {:?}",
407 env.node_clients.len(),
408 fork_choice_state.head_block_hash
409 );
410
411 for (idx, client) in env.node_clients.iter().enumerate() {
412 match EngineApiClient::<Engine>::fork_choice_updated_v3(
413 &client.engine.http_client(),
414 fork_choice_state,
415 None,
416 )
417 .await
418 {
419 Ok(resp) => {
420 debug!(
421 "Client {}: Forkchoice update status: {:?}",
422 idx, resp.payload_status.status
423 );
424 validate_fcu_response(&resp, &format!("Client {idx}"))?;
426 }
427 Err(err) => {
428 return Err(eyre::eyre!(
429 "Client {}: Failed to broadcast forkchoice: {:?}",
430 idx,
431 err
432 ));
433 }
434 }
435 }
436 debug!("Forkchoice update broadcasted successfully");
437 Ok(())
438 })
439 }
440}
441
442#[derive(Debug, Default)]
448pub struct UpdateBlockInfo {}
449
450impl<Engine> Action<Engine> for UpdateBlockInfo
451where
452 Engine: EngineTypes,
453{
454 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
455 Box::pin(async move {
456 let rpc_client = &env.node_clients[0].rpc;
458 let latest_block = EthApiClient::<
459 TransactionRequest,
460 Transaction,
461 Block,
462 Receipt,
463 Header,
464 TransactionSigned,
465 >::block_by_number(
466 rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
467 )
468 .await?
469 .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
470
471 env.set_current_block_info(BlockInfo {
473 hash: latest_block.header.hash,
474 number: latest_block.header.number,
475 timestamp: latest_block.header.timestamp,
476 })?;
477
478 env.active_node_state_mut()?.latest_header_time = latest_block.header.timestamp;
479 env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
480 latest_block.header.hash;
481
482 debug!(
483 "Updated environment to block {} (hash: {})",
484 latest_block.header.number, latest_block.header.hash
485 );
486
487 Ok(())
488 })
489 }
490}
491
492#[derive(Debug, Default)]
498pub struct UpdateBlockInfoToLatestPayload {}
499
500impl<Engine> Action<Engine> for UpdateBlockInfoToLatestPayload
501where
502 Engine: EngineTypes + PayloadTypes,
503 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
504{
505 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
506 Box::pin(async move {
507 let payload_envelope = env
508 .active_node_state()?
509 .latest_payload_envelope
510 .as_ref()
511 .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?;
512
513 let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
514 payload_envelope.clone().into();
515 let execution_payload = execution_payload_envelope.execution_payload;
516
517 let block_hash = execution_payload.payload_inner.payload_inner.block_hash;
518 let block_number = execution_payload.payload_inner.payload_inner.block_number;
519 let block_timestamp = execution_payload.payload_inner.payload_inner.timestamp;
520
521 env.set_current_block_info(BlockInfo {
523 hash: block_hash,
524 number: block_number,
525 timestamp: block_timestamp,
526 })?;
527
528 env.active_node_state_mut()?.latest_header_time = block_timestamp;
529 env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash = block_hash;
530
531 debug!(
532 "Updated environment to newly produced block {} (hash: {})",
533 block_number, block_hash
534 );
535
536 Ok(())
537 })
538 }
539}
540
541#[derive(Debug, Default)]
543pub struct CheckPayloadAccepted {}
544
545impl<Engine> Action<Engine> for CheckPayloadAccepted
546where
547 Engine: EngineTypes,
548 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
549{
550 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
551 Box::pin(async move {
552 let mut accepted_check: bool = false;
553
554 let latest_block = env
555 .current_block_info()
556 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
557
558 let payload_id = *env
559 .active_node_state()?
560 .payload_id_history
561 .get(&(latest_block.number + 1))
562 .ok_or_else(|| eyre::eyre!("Cannot find payload_id"))?;
563
564 let node_clients = env.node_clients.clone();
565 for (idx, client) in node_clients.iter().enumerate() {
566 let rpc_client = &client.rpc;
567
568 let rpc_latest_header = EthApiClient::<
570 TransactionRequest,
571 Transaction,
572 Block,
573 Receipt,
574 Header,
575 TransactionSigned,
576 >::header_by_number(
577 rpc_client, alloy_eips::BlockNumberOrTag::Latest
578 )
579 .await?
580 .ok_or_else(|| eyre::eyre!("No latest header found from rpc"))?;
581
582 let next_new_payload = env
584 .active_node_state()?
585 .latest_payload_built
586 .as_ref()
587 .ok_or_else(|| eyre::eyre!("No next built payload found"))?;
588
589 let built_payload = EngineApiClient::<Engine>::get_payload_v3(
590 &client.engine.http_client(),
591 payload_id,
592 )
593 .await?;
594
595 let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = built_payload.into();
596 let new_payload_block_hash = execution_payload_envelope
597 .execution_payload
598 .payload_inner
599 .payload_inner
600 .block_hash;
601
602 if rpc_latest_header.hash != new_payload_block_hash {
603 debug!(
604 "Client {}: The hash is not matched: {:?} {:?}",
605 idx, rpc_latest_header.hash, new_payload_block_hash
606 );
607 continue;
608 }
609
610 if rpc_latest_header.inner.difficulty != alloy_primitives::U256::ZERO {
611 debug!(
612 "Client {}: difficulty != 0: {:?}",
613 idx, rpc_latest_header.inner.difficulty
614 );
615 continue;
616 }
617
618 if rpc_latest_header.inner.mix_hash != next_new_payload.prev_randao {
619 debug!(
620 "Client {}: The mix_hash and prev_randao is not same: {:?} {:?}",
621 idx, rpc_latest_header.inner.mix_hash, next_new_payload.prev_randao
622 );
623 continue;
624 }
625
626 let extra_len = rpc_latest_header.inner.extra_data.len();
627 if extra_len <= 32 {
628 debug!("Client {}: extra_len is fewer than 32. extra_len: {}", idx, extra_len);
629 continue;
630 }
631
632 if !accepted_check {
634 accepted_check = true;
635 env.set_current_block_info(BlockInfo {
637 hash: rpc_latest_header.hash,
638 number: rpc_latest_header.inner.number,
639 timestamp: rpc_latest_header.inner.timestamp,
640 })?;
641
642 env.active_node_state_mut()?.latest_header_time =
645 rpc_latest_header.inner.timestamp;
646 env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
647 rpc_latest_header.hash;
648 }
649 }
650
651 if accepted_check {
652 Ok(())
653 } else {
654 Err(eyre::eyre!("No clients passed payload acceptance checks"))
655 }
656 })
657 }
658}
659
660#[derive(Debug, Default)]
662pub struct BroadcastNextNewPayload {
663 active_node_only: bool,
665}
666
667impl BroadcastNextNewPayload {
668 pub const fn with_active_node() -> Self {
670 Self { active_node_only: true }
671 }
672}
673
674impl<Engine> Action<Engine> for BroadcastNextNewPayload
675where
676 Engine: EngineTypes + PayloadTypes,
677 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
678 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
679{
680 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
681 Box::pin(async move {
682 let next_new_payload = env
684 .active_node_state()?
685 .latest_payload_built
686 .as_ref()
687 .ok_or_else(|| eyre::eyre!("No next built payload found"))?
688 .clone();
689 let parent_beacon_block_root = next_new_payload
690 .parent_beacon_block_root
691 .ok_or_else(|| eyre::eyre!("No parent beacon block root for next new payload"))?;
692
693 let payload_envelope = env
694 .active_node_state()?
695 .latest_payload_envelope
696 .as_ref()
697 .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?
698 .clone();
699
700 let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = payload_envelope.into();
701 let execution_payload = execution_payload_envelope.execution_payload;
702
703 if self.active_node_only {
704 let active_idx = env.active_node_idx;
706 let engine = env.node_clients[active_idx].engine.http_client();
707
708 let result = EngineApiClient::<Engine>::new_payload_v3(
709 &engine,
710 execution_payload.clone(),
711 vec![],
712 parent_beacon_block_root,
713 )
714 .await?;
715
716 debug!("Active node {}: new_payload status: {:?}", active_idx, result.status);
717
718 match result.status {
720 PayloadStatusEnum::Valid => {
721 env.active_node_state_mut()?.latest_payload_executed =
722 Some(next_new_payload);
723 Ok(())
724 }
725 other => Err(eyre::eyre!(
726 "Active node {}: Unexpected payload status: {:?}",
727 active_idx,
728 other
729 )),
730 }
731 } else {
732 let mut broadcast_results = Vec::new();
734 let mut first_valid_seen = false;
735
736 for (idx, client) in env.node_clients.iter().enumerate() {
737 let engine = client.engine.http_client();
738
739 let result = EngineApiClient::<Engine>::new_payload_v3(
741 &engine,
742 execution_payload.clone(),
743 vec![],
744 parent_beacon_block_root,
745 )
746 .await?;
747
748 broadcast_results.push((idx, result.status.clone()));
749 debug!("Node {}: new_payload broadcast status: {:?}", idx, result.status);
750
751 if result.status == PayloadStatusEnum::Valid && !first_valid_seen {
753 first_valid_seen = true;
754 } else if let PayloadStatusEnum::Invalid { validation_error } = result.status {
755 debug!(
756 "Node {}: Invalid payload status returned from broadcast: {:?}",
757 idx, validation_error
758 );
759 }
760 }
761
762 if first_valid_seen {
764 env.active_node_state_mut()?.latest_payload_executed = Some(next_new_payload);
765 }
766
767 let any_valid =
769 broadcast_results.iter().any(|(_, status)| *status == PayloadStatusEnum::Valid);
770 if !any_valid {
771 return Err(eyre::eyre!(
772 "Failed to successfully broadcast payload to any client"
773 ));
774 }
775
776 debug!("Broadcast complete. Results: {:?}", broadcast_results);
777
778 Ok(())
779 }
780 })
781 }
782}
783
784#[derive(Debug)]
786pub struct ProduceBlocks<Engine> {
787 pub num_blocks: u64,
789 _phantom: PhantomData<Engine>,
791}
792
793impl<Engine> ProduceBlocks<Engine> {
794 pub fn new(num_blocks: u64) -> Self {
796 Self { num_blocks, _phantom: Default::default() }
797 }
798}
799
800impl<Engine> Default for ProduceBlocks<Engine> {
801 fn default() -> Self {
802 Self::new(0)
803 }
804}
805
806impl<Engine> Action<Engine> for ProduceBlocks<Engine>
807where
808 Engine: EngineTypes + PayloadTypes,
809 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
810 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
811{
812 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
813 Box::pin(async move {
814 for _ in 0..self.num_blocks {
815 let mut sequence = Sequence::new(vec![
819 Box::new(PickNextBlockProducer::default()),
820 Box::new(GeneratePayloadAttributes::default()),
821 Box::new(GenerateNextPayload::default()),
822 Box::new(BroadcastNextNewPayload::default()),
823 Box::new(UpdateBlockInfoToLatestPayload::default()),
824 ]);
825 sequence.execute(env).await?;
826 }
827 Ok(())
828 })
829 }
830}
831
832#[derive(Debug)]
834pub struct TestFcuToTag {
835 pub tag: String,
837 pub expected_status: PayloadStatusEnum,
839}
840
841impl TestFcuToTag {
842 pub fn new(tag: impl Into<String>, expected_status: PayloadStatusEnum) -> Self {
844 Self { tag: tag.into(), expected_status }
845 }
846}
847
848impl<Engine> Action<Engine> for TestFcuToTag
849where
850 Engine: EngineTypes,
851{
852 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
853 Box::pin(async move {
854 let (target_block, _node_idx) = env
856 .block_registry
857 .get(&self.tag)
858 .copied()
859 .ok_or_else(|| eyre::eyre!("Block tag '{}' not found in registry", self.tag))?;
860
861 let engine_client = env.node_clients[0].engine.http_client();
862 let fcu_state = ForkchoiceState {
863 head_block_hash: target_block.hash,
864 safe_block_hash: target_block.hash,
865 finalized_block_hash: target_block.hash,
866 };
867
868 let fcu_response =
869 EngineApiClient::<Engine>::fork_choice_updated_v2(&engine_client, fcu_state, None)
870 .await?;
871
872 match (&fcu_response.payload_status.status, &self.expected_status) {
874 (PayloadStatusEnum::Valid, PayloadStatusEnum::Valid) => {
875 debug!("FCU to '{}' returned VALID as expected", self.tag);
876 }
877 (PayloadStatusEnum::Invalid { .. }, PayloadStatusEnum::Invalid { .. }) => {
878 debug!("FCU to '{}' returned INVALID as expected", self.tag);
879 }
880 (PayloadStatusEnum::Syncing, PayloadStatusEnum::Syncing) => {
881 debug!("FCU to '{}' returned SYNCING as expected", self.tag);
882 }
883 (PayloadStatusEnum::Accepted, PayloadStatusEnum::Accepted) => {
884 debug!("FCU to '{}' returned ACCEPTED as expected", self.tag);
885 }
886 (actual, expected) => {
887 return Err(eyre::eyre!(
888 "FCU to '{}': expected status {:?}, but got {:?}",
889 self.tag,
890 expected,
891 actual
892 ));
893 }
894 }
895
896 Ok(())
897 })
898 }
899}
900
901#[derive(Debug)]
903pub struct ExpectFcuStatus {
904 pub target_tag: String,
906 pub expected_status: PayloadStatusEnum,
908}
909
910impl ExpectFcuStatus {
911 pub fn valid(target_tag: impl Into<String>) -> Self {
913 Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Valid }
914 }
915
916 pub fn invalid(target_tag: impl Into<String>) -> Self {
918 Self {
919 target_tag: target_tag.into(),
920 expected_status: PayloadStatusEnum::Invalid {
921 validation_error: "corrupted block".to_string(),
922 },
923 }
924 }
925
926 pub fn syncing(target_tag: impl Into<String>) -> Self {
928 Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Syncing }
929 }
930
931 pub fn accepted(target_tag: impl Into<String>) -> Self {
933 Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Accepted }
934 }
935}
936
937impl<Engine> Action<Engine> for ExpectFcuStatus
938where
939 Engine: EngineTypes,
940{
941 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
942 Box::pin(async move {
943 let mut test_fcu = TestFcuToTag::new(&self.target_tag, self.expected_status.clone());
944 test_fcu.execute(env).await
945 })
946 }
947}
948
949#[derive(Debug)]
951pub struct ValidateCanonicalTag {
952 pub tag: String,
954}
955
956impl ValidateCanonicalTag {
957 pub fn new(tag: impl Into<String>) -> Self {
959 Self { tag: tag.into() }
960 }
961}
962
963impl<Engine> Action<Engine> for ValidateCanonicalTag
964where
965 Engine: EngineTypes,
966{
967 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
968 Box::pin(async move {
969 let mut expect_valid = ExpectFcuStatus::valid(&self.tag);
970 expect_valid.execute(env).await?;
971
972 debug!("Successfully validated that '{}' remains canonical", self.tag);
973 Ok(())
974 })
975 }
976}
977
978#[derive(Debug)]
981pub struct ProduceBlocksLocally<Engine> {
982 pub num_blocks: u64,
984 _phantom: PhantomData<Engine>,
986}
987
988impl<Engine> ProduceBlocksLocally<Engine> {
989 pub fn new(num_blocks: u64) -> Self {
991 Self { num_blocks, _phantom: Default::default() }
992 }
993}
994
995impl<Engine> Default for ProduceBlocksLocally<Engine> {
996 fn default() -> Self {
997 Self::new(0)
998 }
999}
1000
1001impl<Engine> Action<Engine> for ProduceBlocksLocally<Engine>
1002where
1003 Engine: EngineTypes + PayloadTypes,
1004 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
1005 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
1006{
1007 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
1008 Box::pin(async move {
1009 let producer_idx = env.active_node_idx;
1011
1012 for _ in 0..self.num_blocks {
1013 env.last_producer_idx = Some(producer_idx);
1015
1016 let mut sequence = Sequence::new(vec![
1018 Box::new(GeneratePayloadAttributes::default()),
1020 Box::new(GenerateNextPayload::default()),
1021 Box::new(BroadcastNextNewPayload::with_active_node()),
1023 Box::new(UpdateBlockInfoToLatestPayload::default()),
1024 ]);
1025 sequence.execute(env).await?;
1026 }
1027 Ok(())
1028 })
1029 }
1030}
1031
1032#[derive(Debug)]
1034pub struct ProduceInvalidBlocks<Engine> {
1035 pub num_blocks: u64,
1037 pub invalid_indices: HashSet<u64>,
1039 _phantom: PhantomData<Engine>,
1041}
1042
1043impl<Engine> ProduceInvalidBlocks<Engine> {
1044 pub fn new(num_blocks: u64, invalid_indices: HashSet<u64>) -> Self {
1046 Self { num_blocks, invalid_indices, _phantom: Default::default() }
1047 }
1048
1049 pub fn with_invalid_at(num_blocks: u64, invalid_index: u64) -> Self {
1052 let mut invalid_indices = HashSet::new();
1053 invalid_indices.insert(invalid_index);
1054 Self::new(num_blocks, invalid_indices)
1055 }
1056}
1057
1058impl<Engine> Action<Engine> for ProduceInvalidBlocks<Engine>
1059where
1060 Engine: EngineTypes + PayloadTypes,
1061 Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
1062 Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
1063{
1064 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
1065 Box::pin(async move {
1066 for block_index in 0..self.num_blocks {
1067 let is_invalid = self.invalid_indices.contains(&block_index);
1068
1069 if is_invalid {
1070 debug!("Producing invalid block at index {}", block_index);
1071
1072 let mut sequence = Sequence::new(vec![
1074 Box::new(PickNextBlockProducer::default()),
1075 Box::new(GeneratePayloadAttributes::default()),
1076 Box::new(GenerateNextPayload::default()),
1077 ]);
1078 sequence.execute(env).await?;
1079
1080 let latest_envelope =
1082 env.active_node_state()?.latest_payload_envelope.as_ref().ok_or_else(
1083 || eyre::eyre!("No payload envelope available to corrupt"),
1084 )?;
1085
1086 let envelope_v3: ExecutionPayloadEnvelopeV3 = latest_envelope.clone().into();
1087 let mut corrupted_payload = envelope_v3.execution_payload;
1088
1089 corrupted_payload.payload_inner.payload_inner.state_root = B256::random();
1091
1092 debug!(
1093 "Corrupted state root for block {} to: {}",
1094 block_index, corrupted_payload.payload_inner.payload_inner.state_root
1095 );
1096
1097 let engine_client = env.node_clients[0].engine.http_client();
1099 let versioned_hashes = Vec::new();
1101 let parent_beacon_block_root = B256::random();
1103
1104 let new_payload_response = EngineApiClient::<Engine>::new_payload_v3(
1105 &engine_client,
1106 corrupted_payload.clone(),
1107 versioned_hashes,
1108 parent_beacon_block_root,
1109 )
1110 .await?;
1111
1112 match new_payload_response.status {
1114 PayloadStatusEnum::Invalid { validation_error } => {
1115 debug!(
1116 "Block {} correctly rejected as invalid: {:?}",
1117 block_index, validation_error
1118 );
1119 }
1120 other_status => {
1121 return Err(eyre::eyre!(
1122 "Expected block {} to be rejected as INVALID, but got: {:?}",
1123 block_index,
1124 other_status
1125 ));
1126 }
1127 }
1128
1129 env.set_current_block_info(BlockInfo {
1131 hash: corrupted_payload.payload_inner.payload_inner.block_hash,
1132 number: corrupted_payload.payload_inner.payload_inner.block_number,
1133 timestamp: corrupted_payload.timestamp(),
1134 })?;
1135 } else {
1136 debug!("Producing valid block at index {}", block_index);
1137
1138 let mut sequence = Sequence::new(vec![
1140 Box::new(PickNextBlockProducer::default()),
1141 Box::new(GeneratePayloadAttributes::default()),
1142 Box::new(GenerateNextPayload::default()),
1143 Box::new(BroadcastNextNewPayload::default()),
1144 Box::new(UpdateBlockInfoToLatestPayload::default()),
1145 ]);
1146 sequence.execute(env).await?;
1147 }
1148 }
1149 Ok(())
1150 })
1151 }
1152}