reth_e2e_test_utils/testsuite/actions/
produce_blocks.rs

1//! Block production actions for the e2e testing framework.
2
3use crate::testsuite::{
4    actions::{expect_fcu_not_syncing_or_accepted, validate_fcu_response, Action, Sequence},
5    BlockInfo, Environment,
6};
7use alloy_primitives::{Bytes, B256};
8use alloy_rpc_types_engine::{
9    payload::ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum,
10};
11use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction, TransactionRequest};
12use eyre::Result;
13use futures_util::future::BoxFuture;
14use reth_node_api::{EngineTypes, PayloadTypes};
15use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
16use std::{collections::HashSet, marker::PhantomData, time::Duration};
17use tokio::time::sleep;
18use tracing::debug;
19
20/// Mine a single block with the given transactions and verify the block was created
21/// successfully.
22#[derive(Debug)]
23pub struct AssertMineBlock<Engine>
24where
25    Engine: PayloadTypes,
26{
27    /// The node index to mine
28    pub node_idx: usize,
29    /// Transactions to include in the block
30    pub transactions: Vec<Bytes>,
31    /// Expected block hash (optional)
32    pub expected_hash: Option<B256>,
33    /// Block's payload attributes
34    // TODO: refactor once we have actions to generate payload attributes.
35    pub payload_attributes: Engine::PayloadAttributes,
36    /// Tracks engine type
37    _phantom: PhantomData<Engine>,
38}
39
40impl<Engine> AssertMineBlock<Engine>
41where
42    Engine: PayloadTypes,
43{
44    /// Create a new `AssertMineBlock` action
45    pub fn new(
46        node_idx: usize,
47        transactions: Vec<Bytes>,
48        expected_hash: Option<B256>,
49        payload_attributes: Engine::PayloadAttributes,
50    ) -> Self {
51        Self {
52            node_idx,
53            transactions,
54            expected_hash,
55            payload_attributes,
56            _phantom: Default::default(),
57        }
58    }
59}
60
61impl<Engine> Action<Engine> for AssertMineBlock<Engine>
62where
63    Engine: EngineTypes,
64{
65    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
66        Box::pin(async move {
67            if self.node_idx >= env.node_clients.len() {
68                return Err(eyre::eyre!("Node index out of bounds: {}", self.node_idx));
69            }
70
71            let node_client = &env.node_clients[self.node_idx];
72            let rpc_client = &node_client.rpc;
73            let engine_client = node_client.engine.http_client();
74
75            // get the latest block to use as parent
76            let latest_block = EthApiClient::<
77                TransactionRequest,
78                Transaction,
79                Block,
80                Receipt,
81                Header,
82            >::block_by_number(
83                rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
84            )
85            .await?;
86
87            let latest_block = latest_block.ok_or_else(|| eyre::eyre!("Latest block not found"))?;
88            let parent_hash = latest_block.header.hash;
89
90            debug!("Latest block hash: {parent_hash}");
91
92            // create a simple forkchoice state with the latest block as head
93            let fork_choice_state = ForkchoiceState {
94                head_block_hash: parent_hash,
95                safe_block_hash: parent_hash,
96                finalized_block_hash: parent_hash,
97            };
98
99            let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v2(
100                &engine_client,
101                fork_choice_state,
102                Some(self.payload_attributes.clone()),
103            )
104            .await?;
105
106            debug!("FCU result: {:?}", fcu_result);
107
108            // check if we got a valid payload ID
109            match fcu_result.payload_status.status {
110                PayloadStatusEnum::Valid => {
111                    if let Some(payload_id) = fcu_result.payload_id {
112                        debug!("Got payload ID: {payload_id}");
113
114                        // get the payload that was built
115                        let _engine_payload =
116                            EngineApiClient::<Engine>::get_payload_v2(&engine_client, payload_id)
117                                .await?;
118                        Ok(())
119                    } else {
120                        Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
121                    }
122                }
123                _ => Err(eyre::eyre!("Payload status not valid: {:?}", fcu_result.payload_status)),
124            }
125        })
126    }
127}
128
129/// Pick the next block producer based on the latest block information.
130#[derive(Debug, Default)]
131pub struct PickNextBlockProducer {}
132
133impl PickNextBlockProducer {
134    /// Create a new `PickNextBlockProducer` action
135    pub const fn new() -> Self {
136        Self {}
137    }
138}
139
140impl<Engine> Action<Engine> for PickNextBlockProducer
141where
142    Engine: EngineTypes,
143{
144    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
145        Box::pin(async move {
146            let num_clients = env.node_clients.len();
147            if num_clients == 0 {
148                return Err(eyre::eyre!("No node clients available"));
149            }
150
151            let latest_info = env
152                .current_block_info()
153                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
154
155            // simple round-robin selection based on next block number
156            let next_producer_idx = ((latest_info.number + 1) % num_clients as u64) as usize;
157
158            env.last_producer_idx = Some(next_producer_idx);
159            debug!(
160                "Selected node {} as the next block producer for block {}",
161                next_producer_idx,
162                latest_info.number + 1
163            );
164
165            Ok(())
166        })
167    }
168}
169
170/// Store payload attributes for the next block.
171#[derive(Debug, Default)]
172pub struct GeneratePayloadAttributes {}
173
174impl<Engine> Action<Engine> for GeneratePayloadAttributes
175where
176    Engine: EngineTypes + PayloadTypes,
177    Engine::PayloadAttributes: From<PayloadAttributes>,
178{
179    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
180        Box::pin(async move {
181            let latest_block = env
182                .current_block_info()
183                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
184            let block_number = latest_block.number;
185            let timestamp =
186                env.active_node_state()?.latest_header_time + env.block_timestamp_increment;
187            let payload_attributes = PayloadAttributes {
188                timestamp,
189                prev_randao: B256::random(),
190                suggested_fee_recipient: alloy_primitives::Address::random(),
191                withdrawals: Some(vec![]),
192                parent_beacon_block_root: Some(B256::ZERO),
193            };
194
195            env.active_node_state_mut()?
196                .payload_attributes
197                .insert(latest_block.number + 1, payload_attributes);
198            debug!("Stored payload attributes for block {}", block_number + 1);
199            Ok(())
200        })
201    }
202}
203
204/// Action that generates the next payload
205#[derive(Debug, Default)]
206pub struct GenerateNextPayload {}
207
208impl<Engine> Action<Engine> for GenerateNextPayload
209where
210    Engine: EngineTypes + PayloadTypes,
211    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
212{
213    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
214        Box::pin(async move {
215            let latest_block = env
216                .current_block_info()
217                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
218
219            let parent_hash = latest_block.hash;
220            debug!("Latest block hash: {parent_hash}");
221
222            let fork_choice_state = ForkchoiceState {
223                head_block_hash: parent_hash,
224                safe_block_hash: parent_hash,
225                finalized_block_hash: parent_hash,
226            };
227
228            let payload_attributes = env
229                .active_node_state()?
230                .payload_attributes
231                .get(&(latest_block.number + 1))
232                .cloned()
233                .ok_or_else(|| eyre::eyre!("No payload attributes found for next block"))?;
234
235            let producer_idx =
236                env.last_producer_idx.ok_or_else(|| eyre::eyre!("No block producer selected"))?;
237
238            let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
239                &env.node_clients[producer_idx].engine.http_client(),
240                fork_choice_state,
241                Some(payload_attributes.clone().into()),
242            )
243            .await?;
244
245            debug!("FCU result: {:?}", fcu_result);
246
247            // validate the FCU status before proceeding
248            // Note: In the context of GenerateNextPayload, Syncing usually means the engine
249            // doesn't have the requested head block, which should be an error
250            expect_fcu_not_syncing_or_accepted(&fcu_result, "GenerateNextPayload")?;
251
252            let payload_id = if let Some(payload_id) = fcu_result.payload_id {
253                debug!("Received new payload ID: {:?}", payload_id);
254                payload_id
255            } else {
256                debug!("No payload ID returned, generating fresh payload attributes for forking");
257
258                let fresh_payload_attributes = PayloadAttributes {
259                    timestamp: env.active_node_state()?.latest_header_time +
260                        env.block_timestamp_increment,
261                    prev_randao: B256::random(),
262                    suggested_fee_recipient: alloy_primitives::Address::random(),
263                    withdrawals: Some(vec![]),
264                    parent_beacon_block_root: Some(B256::ZERO),
265                };
266
267                let fresh_fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
268                    &env.node_clients[producer_idx].engine.http_client(),
269                    fork_choice_state,
270                    Some(fresh_payload_attributes.clone().into()),
271                )
272                .await?;
273
274                debug!("Fresh FCU result: {:?}", fresh_fcu_result);
275
276                // validate the fresh FCU status
277                expect_fcu_not_syncing_or_accepted(
278                    &fresh_fcu_result,
279                    "GenerateNextPayload (fresh)",
280                )?;
281
282                if let Some(payload_id) = fresh_fcu_result.payload_id {
283                    payload_id
284                } else {
285                    debug!("Engine considers the fork base already canonical, skipping payload generation");
286                    return Ok(());
287                }
288            };
289
290            env.active_node_state_mut()?.next_payload_id = Some(payload_id);
291
292            sleep(Duration::from_secs(1)).await;
293
294            let built_payload_envelope = EngineApiClient::<Engine>::get_payload_v3(
295                &env.node_clients[producer_idx].engine.http_client(),
296                payload_id,
297            )
298            .await?;
299
300            // Store the payload attributes that were used to generate this payload
301            let built_payload = payload_attributes.clone();
302            env.active_node_state_mut()?
303                .payload_id_history
304                .insert(latest_block.number + 1, payload_id);
305            env.active_node_state_mut()?.latest_payload_built = Some(built_payload);
306            env.active_node_state_mut()?.latest_payload_envelope = Some(built_payload_envelope);
307
308            Ok(())
309        })
310    }
311}
312
313/// Action that broadcasts the latest fork choice state to all clients
314#[derive(Debug, Default)]
315pub struct BroadcastLatestForkchoice {}
316
317impl<Engine> Action<Engine> for BroadcastLatestForkchoice
318where
319    Engine: EngineTypes + PayloadTypes,
320    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
321    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
322{
323    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
324        Box::pin(async move {
325            if env.node_clients.is_empty() {
326                return Err(eyre::eyre!("No node clients available"));
327            }
328
329            // use the hash of the newly executed payload if available
330            let head_hash = if let Some(payload_envelope) =
331                &env.active_node_state()?.latest_payload_envelope
332            {
333                let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
334                    payload_envelope.clone().into();
335                let new_block_hash = execution_payload_envelope
336                    .execution_payload
337                    .payload_inner
338                    .payload_inner
339                    .block_hash;
340                debug!("Using newly executed block hash as head: {new_block_hash}");
341                new_block_hash
342            } else {
343                // fallback to RPC query
344                let rpc_client = &env.node_clients[0].rpc;
345                let current_head_block = EthApiClient::<
346                    TransactionRequest,
347                    Transaction,
348                    Block,
349                    Receipt,
350                    Header,
351                >::block_by_number(
352                    rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
353                )
354                .await?
355                .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
356                debug!("Using RPC latest block hash as head: {}", current_head_block.header.hash);
357                current_head_block.header.hash
358            };
359
360            let fork_choice_state = ForkchoiceState {
361                head_block_hash: head_hash,
362                safe_block_hash: head_hash,
363                finalized_block_hash: head_hash,
364            };
365            debug!(
366                "Broadcasting forkchoice update to {} clients. Head: {:?}",
367                env.node_clients.len(),
368                fork_choice_state.head_block_hash
369            );
370
371            for (idx, client) in env.node_clients.iter().enumerate() {
372                match EngineApiClient::<Engine>::fork_choice_updated_v3(
373                    &client.engine.http_client(),
374                    fork_choice_state,
375                    None,
376                )
377                .await
378                {
379                    Ok(resp) => {
380                        debug!(
381                            "Client {}: Forkchoice update status: {:?}",
382                            idx, resp.payload_status.status
383                        );
384                        // validate that the forkchoice update was accepted
385                        validate_fcu_response(&resp, &format!("Client {idx}"))?;
386                    }
387                    Err(err) => {
388                        return Err(eyre::eyre!(
389                            "Client {}: Failed to broadcast forkchoice: {:?}",
390                            idx,
391                            err
392                        ));
393                    }
394                }
395            }
396            debug!("Forkchoice update broadcasted successfully");
397            Ok(())
398        })
399    }
400}
401
402/// Action that syncs environment state with the node's canonical chain via RPC.
403///
404/// This queries the latest canonical block from the node and updates the environment
405/// to match. Typically used after forkchoice operations to ensure the environment
406/// is in sync with the node's view of the canonical chain.
407#[derive(Debug, Default)]
408pub struct UpdateBlockInfo {}
409
410impl<Engine> Action<Engine> for UpdateBlockInfo
411where
412    Engine: EngineTypes,
413{
414    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
415        Box::pin(async move {
416            // get the latest block from the first client to update environment state
417            let rpc_client = &env.node_clients[0].rpc;
418            let latest_block = EthApiClient::<
419                TransactionRequest,
420                Transaction,
421                Block,
422                Receipt,
423                Header,
424            >::block_by_number(
425                rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
426            )
427            .await?
428            .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
429
430            // update environment with the new block information
431            env.set_current_block_info(BlockInfo {
432                hash: latest_block.header.hash,
433                number: latest_block.header.number,
434                timestamp: latest_block.header.timestamp,
435            })?;
436
437            env.active_node_state_mut()?.latest_header_time = latest_block.header.timestamp;
438            env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
439                latest_block.header.hash;
440
441            debug!(
442                "Updated environment to block {} (hash: {})",
443                latest_block.header.number, latest_block.header.hash
444            );
445
446            Ok(())
447        })
448    }
449}
450
451/// Action that updates environment state using the locally produced payload.
452///
453/// This uses the execution payload stored in the environment rather than querying RPC,
454/// making it more efficient and reliable during block production. Preferred over
455/// `UpdateBlockInfo` when we have just produced a block and have the payload available.
456#[derive(Debug, Default)]
457pub struct UpdateBlockInfoToLatestPayload {}
458
459impl<Engine> Action<Engine> for UpdateBlockInfoToLatestPayload
460where
461    Engine: EngineTypes + PayloadTypes,
462    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
463{
464    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
465        Box::pin(async move {
466            let payload_envelope = env
467                .active_node_state()?
468                .latest_payload_envelope
469                .as_ref()
470                .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?;
471
472            let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
473                payload_envelope.clone().into();
474            let execution_payload = execution_payload_envelope.execution_payload;
475
476            let block_hash = execution_payload.payload_inner.payload_inner.block_hash;
477            let block_number = execution_payload.payload_inner.payload_inner.block_number;
478            let block_timestamp = execution_payload.payload_inner.payload_inner.timestamp;
479
480            // update environment with the new block information from the payload
481            env.set_current_block_info(BlockInfo {
482                hash: block_hash,
483                number: block_number,
484                timestamp: block_timestamp,
485            })?;
486
487            env.active_node_state_mut()?.latest_header_time = block_timestamp;
488            env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash = block_hash;
489
490            debug!(
491                "Updated environment to newly produced block {} (hash: {})",
492                block_number, block_hash
493            );
494
495            Ok(())
496        })
497    }
498}
499
500/// Action that checks whether the broadcasted new payload has been accepted
501#[derive(Debug, Default)]
502pub struct CheckPayloadAccepted {}
503
504impl<Engine> Action<Engine> for CheckPayloadAccepted
505where
506    Engine: EngineTypes,
507    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
508{
509    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
510        Box::pin(async move {
511            let mut accepted_check: bool = false;
512
513            let mut latest_block = env
514                .current_block_info()
515                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
516
517            let payload_id = *env
518                .active_node_state()?
519                .payload_id_history
520                .get(&(latest_block.number + 1))
521                .ok_or_else(|| eyre::eyre!("Cannot find payload_id"))?;
522
523            let node_clients = env.node_clients.clone();
524            for (idx, client) in node_clients.iter().enumerate() {
525                let rpc_client = &client.rpc;
526
527                // get the last header by number using latest_head_number
528                let rpc_latest_header = EthApiClient::<
529                    TransactionRequest,
530                    Transaction,
531                    Block,
532                    Receipt,
533                    Header,
534                >::header_by_number(
535                    rpc_client, alloy_eips::BlockNumberOrTag::Latest
536                )
537                .await?
538                .ok_or_else(|| eyre::eyre!("No latest header found from rpc"))?;
539
540                // perform several checks
541                let next_new_payload = env
542                    .active_node_state()?
543                    .latest_payload_built
544                    .as_ref()
545                    .ok_or_else(|| eyre::eyre!("No next built payload found"))?;
546
547                let built_payload = EngineApiClient::<Engine>::get_payload_v3(
548                    &client.engine.http_client(),
549                    payload_id,
550                )
551                .await?;
552
553                let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = built_payload.into();
554                let new_payload_block_hash = execution_payload_envelope
555                    .execution_payload
556                    .payload_inner
557                    .payload_inner
558                    .block_hash;
559
560                if rpc_latest_header.hash != new_payload_block_hash {
561                    debug!(
562                        "Client {}: The hash is not matched: {:?} {:?}",
563                        idx, rpc_latest_header.hash, new_payload_block_hash
564                    );
565                    continue;
566                }
567
568                if rpc_latest_header.inner.difficulty != alloy_primitives::U256::ZERO {
569                    debug!(
570                        "Client {}: difficulty != 0: {:?}",
571                        idx, rpc_latest_header.inner.difficulty
572                    );
573                    continue;
574                }
575
576                if rpc_latest_header.inner.mix_hash != next_new_payload.prev_randao {
577                    debug!(
578                        "Client {}: The mix_hash and prev_randao is not same: {:?} {:?}",
579                        idx, rpc_latest_header.inner.mix_hash, next_new_payload.prev_randao
580                    );
581                    continue;
582                }
583
584                let extra_len = rpc_latest_header.inner.extra_data.len();
585                if extra_len <= 32 {
586                    debug!("Client {}: extra_len is fewer than 32. extra_len: {}", idx, extra_len);
587                    continue;
588                }
589
590                // at least one client passes all the check, save the header in Env
591                if !accepted_check {
592                    accepted_check = true;
593                    // save the current block info in Env
594                    env.set_current_block_info(BlockInfo {
595                        hash: rpc_latest_header.hash,
596                        number: rpc_latest_header.inner.number,
597                        timestamp: rpc_latest_header.inner.timestamp,
598                    })?;
599
600                    // align latest header time and forkchoice state with the accepted canonical
601                    // head
602                    env.active_node_state_mut()?.latest_header_time =
603                        rpc_latest_header.inner.timestamp;
604                    env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
605                        rpc_latest_header.hash;
606
607                    // update local copy for any further usage in this scope
608                    latest_block.hash = rpc_latest_header.hash;
609                    latest_block.number = rpc_latest_header.inner.number;
610                }
611            }
612
613            if accepted_check {
614                Ok(())
615            } else {
616                Err(eyre::eyre!("No clients passed payload acceptance checks"))
617            }
618        })
619    }
620}
621
622/// Action that broadcasts the next new payload
623#[derive(Debug, Default)]
624pub struct BroadcastNextNewPayload {
625    /// If true, only send to the active node. If false, broadcast to all nodes.
626    active_node_only: bool,
627}
628
629impl BroadcastNextNewPayload {
630    /// Create a new `BroadcastNextNewPayload` action that only sends to the active node
631    pub const fn with_active_node() -> Self {
632        Self { active_node_only: true }
633    }
634}
635
636impl<Engine> Action<Engine> for BroadcastNextNewPayload
637where
638    Engine: EngineTypes + PayloadTypes,
639    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
640    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
641{
642    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
643        Box::pin(async move {
644            // Get the next new payload to broadcast
645            let next_new_payload = env
646                .active_node_state()?
647                .latest_payload_built
648                .as_ref()
649                .ok_or_else(|| eyre::eyre!("No next built payload found"))?
650                .clone();
651            let parent_beacon_block_root = next_new_payload
652                .parent_beacon_block_root
653                .ok_or_else(|| eyre::eyre!("No parent beacon block root for next new payload"))?;
654
655            let payload_envelope = env
656                .active_node_state()?
657                .latest_payload_envelope
658                .as_ref()
659                .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?
660                .clone();
661
662            let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = payload_envelope.into();
663            let execution_payload = execution_payload_envelope.execution_payload;
664
665            if self.active_node_only {
666                // Send only to the active node
667                let active_idx = env.active_node_idx;
668                let engine = env.node_clients[active_idx].engine.http_client();
669
670                let result = EngineApiClient::<Engine>::new_payload_v3(
671                    &engine,
672                    execution_payload.clone(),
673                    vec![],
674                    parent_beacon_block_root,
675                )
676                .await?;
677
678                debug!("Active node {}: new_payload status: {:?}", active_idx, result.status);
679
680                // Validate the response
681                match result.status {
682                    PayloadStatusEnum::Valid => {
683                        env.active_node_state_mut()?.latest_payload_executed =
684                            Some(next_new_payload);
685                        Ok(())
686                    }
687                    other => Err(eyre::eyre!(
688                        "Active node {}: Unexpected payload status: {:?}",
689                        active_idx,
690                        other
691                    )),
692                }
693            } else {
694                // Loop through all clients and broadcast the next new payload
695                let mut broadcast_results = Vec::new();
696                let mut first_valid_seen = false;
697
698                for (idx, client) in env.node_clients.iter().enumerate() {
699                    let engine = client.engine.http_client();
700
701                    // Broadcast the execution payload
702                    let result = EngineApiClient::<Engine>::new_payload_v3(
703                        &engine,
704                        execution_payload.clone(),
705                        vec![],
706                        parent_beacon_block_root,
707                    )
708                    .await?;
709
710                    broadcast_results.push((idx, result.status.clone()));
711                    debug!("Node {}: new_payload broadcast status: {:?}", idx, result.status);
712
713                    // Check if this node accepted the payload
714                    if result.status == PayloadStatusEnum::Valid && !first_valid_seen {
715                        first_valid_seen = true;
716                    } else if let PayloadStatusEnum::Invalid { validation_error } = result.status {
717                        debug!(
718                            "Node {}: Invalid payload status returned from broadcast: {:?}",
719                            idx, validation_error
720                        );
721                    }
722                }
723
724                // Update the executed payload state after broadcasting to all nodes
725                if first_valid_seen {
726                    env.active_node_state_mut()?.latest_payload_executed = Some(next_new_payload);
727                }
728
729                // Check if at least one node accepted the payload
730                let any_valid =
731                    broadcast_results.iter().any(|(_, status)| *status == PayloadStatusEnum::Valid);
732                if !any_valid {
733                    return Err(eyre::eyre!(
734                        "Failed to successfully broadcast payload to any client"
735                    ));
736                }
737
738                debug!("Broadcast complete. Results: {:?}", broadcast_results);
739
740                Ok(())
741            }
742        })
743    }
744}
745
746/// Action that produces a sequence of blocks using the available clients
747#[derive(Debug)]
748pub struct ProduceBlocks<Engine> {
749    /// Number of blocks to produce
750    pub num_blocks: u64,
751    /// Tracks engine type
752    _phantom: PhantomData<Engine>,
753}
754
755impl<Engine> ProduceBlocks<Engine> {
756    /// Create a new `ProduceBlocks` action
757    pub fn new(num_blocks: u64) -> Self {
758        Self { num_blocks, _phantom: Default::default() }
759    }
760}
761
762impl<Engine> Default for ProduceBlocks<Engine> {
763    fn default() -> Self {
764        Self::new(0)
765    }
766}
767
768impl<Engine> Action<Engine> for ProduceBlocks<Engine>
769where
770    Engine: EngineTypes + PayloadTypes,
771    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
772    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
773{
774    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
775        Box::pin(async move {
776            for _ in 0..self.num_blocks {
777                // create a fresh sequence for each block to avoid state pollution
778                // Note: This produces blocks but does NOT make them canonical
779                // Use MakeCanonical action explicitly if canonicalization is needed
780                let mut sequence = Sequence::new(vec![
781                    Box::new(PickNextBlockProducer::default()),
782                    Box::new(GeneratePayloadAttributes::default()),
783                    Box::new(GenerateNextPayload::default()),
784                    Box::new(BroadcastNextNewPayload::default()),
785                    Box::new(UpdateBlockInfoToLatestPayload::default()),
786                ]);
787                sequence.execute(env).await?;
788            }
789            Ok(())
790        })
791    }
792}
793
794/// Action to test forkchoice update to a tagged block with expected status
795#[derive(Debug)]
796pub struct TestFcuToTag {
797    /// Tag name of the target block
798    pub tag: String,
799    /// Expected payload status
800    pub expected_status: PayloadStatusEnum,
801}
802
803impl TestFcuToTag {
804    /// Create a new `TestFcuToTag` action
805    pub fn new(tag: impl Into<String>, expected_status: PayloadStatusEnum) -> Self {
806        Self { tag: tag.into(), expected_status }
807    }
808}
809
810impl<Engine> Action<Engine> for TestFcuToTag
811where
812    Engine: EngineTypes,
813{
814    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
815        Box::pin(async move {
816            // get the target block from the registry
817            let (target_block, _node_idx) = env
818                .block_registry
819                .get(&self.tag)
820                .copied()
821                .ok_or_else(|| eyre::eyre!("Block tag '{}' not found in registry", self.tag))?;
822
823            let engine_client = env.node_clients[0].engine.http_client();
824            let fcu_state = ForkchoiceState {
825                head_block_hash: target_block.hash,
826                safe_block_hash: target_block.hash,
827                finalized_block_hash: target_block.hash,
828            };
829
830            let fcu_response =
831                EngineApiClient::<Engine>::fork_choice_updated_v2(&engine_client, fcu_state, None)
832                    .await?;
833
834            // validate the response matches expected status
835            match (&fcu_response.payload_status.status, &self.expected_status) {
836                (PayloadStatusEnum::Valid, PayloadStatusEnum::Valid) => {
837                    debug!("FCU to '{}' returned VALID as expected", self.tag);
838                }
839                (PayloadStatusEnum::Invalid { .. }, PayloadStatusEnum::Invalid { .. }) => {
840                    debug!("FCU to '{}' returned INVALID as expected", self.tag);
841                }
842                (PayloadStatusEnum::Syncing, PayloadStatusEnum::Syncing) => {
843                    debug!("FCU to '{}' returned SYNCING as expected", self.tag);
844                }
845                (PayloadStatusEnum::Accepted, PayloadStatusEnum::Accepted) => {
846                    debug!("FCU to '{}' returned ACCEPTED as expected", self.tag);
847                }
848                (actual, expected) => {
849                    return Err(eyre::eyre!(
850                        "FCU to '{}': expected status {:?}, but got {:?}",
851                        self.tag,
852                        expected,
853                        actual
854                    ));
855                }
856            }
857
858            Ok(())
859        })
860    }
861}
862
863/// Action to expect a specific FCU status when targeting a tagged block
864#[derive(Debug)]
865pub struct ExpectFcuStatus {
866    /// Tag name of the target block
867    pub target_tag: String,
868    /// Expected payload status
869    pub expected_status: PayloadStatusEnum,
870}
871
872impl ExpectFcuStatus {
873    /// Create a new `ExpectFcuStatus` action expecting VALID status
874    pub fn valid(target_tag: impl Into<String>) -> Self {
875        Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Valid }
876    }
877
878    /// Create a new `ExpectFcuStatus` action expecting INVALID status
879    pub fn invalid(target_tag: impl Into<String>) -> Self {
880        Self {
881            target_tag: target_tag.into(),
882            expected_status: PayloadStatusEnum::Invalid {
883                validation_error: "corrupted block".to_string(),
884            },
885        }
886    }
887
888    /// Create a new `ExpectFcuStatus` action expecting SYNCING status
889    pub fn syncing(target_tag: impl Into<String>) -> Self {
890        Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Syncing }
891    }
892
893    /// Create a new `ExpectFcuStatus` action expecting ACCEPTED status
894    pub fn accepted(target_tag: impl Into<String>) -> Self {
895        Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Accepted }
896    }
897}
898
899impl<Engine> Action<Engine> for ExpectFcuStatus
900where
901    Engine: EngineTypes,
902{
903    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
904        Box::pin(async move {
905            let mut test_fcu = TestFcuToTag::new(&self.target_tag, self.expected_status.clone());
906            test_fcu.execute(env).await
907        })
908    }
909}
910
911/// Action to validate that a tagged block remains canonical by performing FCU to it
912#[derive(Debug)]
913pub struct ValidateCanonicalTag {
914    /// Tag name of the block to validate as canonical
915    pub tag: String,
916}
917
918impl ValidateCanonicalTag {
919    /// Create a new `ValidateCanonicalTag` action
920    pub fn new(tag: impl Into<String>) -> Self {
921        Self { tag: tag.into() }
922    }
923}
924
925impl<Engine> Action<Engine> for ValidateCanonicalTag
926where
927    Engine: EngineTypes,
928{
929    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
930        Box::pin(async move {
931            let mut expect_valid = ExpectFcuStatus::valid(&self.tag);
932            expect_valid.execute(env).await?;
933
934            debug!("Successfully validated that '{}' remains canonical", self.tag);
935            Ok(())
936        })
937    }
938}
939
940/// Action that produces blocks locally without broadcasting to other nodes
941/// This sends the payload only to the active node to ensure it's available locally
942#[derive(Debug)]
943pub struct ProduceBlocksLocally<Engine> {
944    /// Number of blocks to produce
945    pub num_blocks: u64,
946    /// Tracks engine type
947    _phantom: PhantomData<Engine>,
948}
949
950impl<Engine> ProduceBlocksLocally<Engine> {
951    /// Create a new `ProduceBlocksLocally` action
952    pub fn new(num_blocks: u64) -> Self {
953        Self { num_blocks, _phantom: Default::default() }
954    }
955}
956
957impl<Engine> Default for ProduceBlocksLocally<Engine> {
958    fn default() -> Self {
959        Self::new(0)
960    }
961}
962
963impl<Engine> Action<Engine> for ProduceBlocksLocally<Engine>
964where
965    Engine: EngineTypes + PayloadTypes,
966    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
967    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
968{
969    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
970        Box::pin(async move {
971            // Remember the active node to ensure all blocks are produced on the same node
972            let producer_idx = env.active_node_idx;
973
974            for _ in 0..self.num_blocks {
975                // Ensure we always use the same producer
976                env.last_producer_idx = Some(producer_idx);
977
978                // create a sequence that produces blocks and sends only to active node
979                let mut sequence = Sequence::new(vec![
980                    // Skip PickNextBlockProducer to maintain the same producer
981                    Box::new(GeneratePayloadAttributes::default()),
982                    Box::new(GenerateNextPayload::default()),
983                    // Send payload only to the active node to make it available
984                    Box::new(BroadcastNextNewPayload::with_active_node()),
985                    Box::new(UpdateBlockInfoToLatestPayload::default()),
986                ]);
987                sequence.execute(env).await?;
988            }
989            Ok(())
990        })
991    }
992}
993
994/// Action that produces a sequence of blocks where some blocks are intentionally invalid
995#[derive(Debug)]
996pub struct ProduceInvalidBlocks<Engine> {
997    /// Number of blocks to produce
998    pub num_blocks: u64,
999    /// Set of indices (0-based) where blocks should be made invalid
1000    pub invalid_indices: HashSet<u64>,
1001    /// Tracks engine type
1002    _phantom: PhantomData<Engine>,
1003}
1004
1005impl<Engine> ProduceInvalidBlocks<Engine> {
1006    /// Create a new `ProduceInvalidBlocks` action
1007    pub fn new(num_blocks: u64, invalid_indices: HashSet<u64>) -> Self {
1008        Self { num_blocks, invalid_indices, _phantom: Default::default() }
1009    }
1010
1011    /// Create a new `ProduceInvalidBlocks` action with a single invalid block at the specified
1012    /// index
1013    pub fn with_invalid_at(num_blocks: u64, invalid_index: u64) -> Self {
1014        let mut invalid_indices = HashSet::new();
1015        invalid_indices.insert(invalid_index);
1016        Self::new(num_blocks, invalid_indices)
1017    }
1018}
1019
1020impl<Engine> Action<Engine> for ProduceInvalidBlocks<Engine>
1021where
1022    Engine: EngineTypes + PayloadTypes,
1023    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
1024    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
1025{
1026    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
1027        Box::pin(async move {
1028            for block_index in 0..self.num_blocks {
1029                let is_invalid = self.invalid_indices.contains(&block_index);
1030
1031                if is_invalid {
1032                    debug!("Producing invalid block at index {}", block_index);
1033
1034                    // produce a valid block first, then corrupt it
1035                    let mut sequence = Sequence::new(vec![
1036                        Box::new(PickNextBlockProducer::default()),
1037                        Box::new(GeneratePayloadAttributes::default()),
1038                        Box::new(GenerateNextPayload::default()),
1039                    ]);
1040                    sequence.execute(env).await?;
1041
1042                    // get the latest payload and corrupt it
1043                    let latest_envelope =
1044                        env.active_node_state()?.latest_payload_envelope.as_ref().ok_or_else(
1045                            || eyre::eyre!("No payload envelope available to corrupt"),
1046                        )?;
1047
1048                    let envelope_v3: ExecutionPayloadEnvelopeV3 = latest_envelope.clone().into();
1049                    let mut corrupted_payload = envelope_v3.execution_payload;
1050
1051                    // corrupt the state root to make the block invalid
1052                    corrupted_payload.payload_inner.payload_inner.state_root = B256::random();
1053
1054                    debug!(
1055                        "Corrupted state root for block {} to: {}",
1056                        block_index, corrupted_payload.payload_inner.payload_inner.state_root
1057                    );
1058
1059                    // send the corrupted payload via newPayload
1060                    let engine_client = env.node_clients[0].engine.http_client();
1061                    // for simplicity, we'll use empty versioned hashes for invalid block testing
1062                    let versioned_hashes = Vec::new();
1063                    // use a random parent beacon block root since this is for invalid block testing
1064                    let parent_beacon_block_root = B256::random();
1065
1066                    let new_payload_response = EngineApiClient::<Engine>::new_payload_v3(
1067                        &engine_client,
1068                        corrupted_payload.clone(),
1069                        versioned_hashes,
1070                        parent_beacon_block_root,
1071                    )
1072                    .await?;
1073
1074                    // expect the payload to be rejected as invalid
1075                    match new_payload_response.status {
1076                        PayloadStatusEnum::Invalid { validation_error } => {
1077                            debug!(
1078                                "Block {} correctly rejected as invalid: {:?}",
1079                                block_index, validation_error
1080                            );
1081                        }
1082                        other_status => {
1083                            return Err(eyre::eyre!(
1084                                "Expected block {} to be rejected as INVALID, but got: {:?}",
1085                                block_index,
1086                                other_status
1087                            ));
1088                        }
1089                    }
1090
1091                    // update block info with the corrupted block (for potential future reference)
1092                    env.set_current_block_info(BlockInfo {
1093                        hash: corrupted_payload.payload_inner.payload_inner.block_hash,
1094                        number: corrupted_payload.payload_inner.payload_inner.block_number,
1095                        timestamp: corrupted_payload.timestamp(),
1096                    })?;
1097                } else {
1098                    debug!("Producing valid block at index {}", block_index);
1099
1100                    // produce a valid block normally
1101                    let mut sequence = Sequence::new(vec![
1102                        Box::new(PickNextBlockProducer::default()),
1103                        Box::new(GeneratePayloadAttributes::default()),
1104                        Box::new(GenerateNextPayload::default()),
1105                        Box::new(BroadcastNextNewPayload::default()),
1106                        Box::new(UpdateBlockInfoToLatestPayload::default()),
1107                    ]);
1108                    sequence.execute(env).await?;
1109                }
1110            }
1111            Ok(())
1112        })
1113    }
1114}