Skip to main content

reth_e2e_test_utils/testsuite/actions/
produce_blocks.rs

1//! Block production actions for the e2e testing framework.
2
3use crate::testsuite::{
4    actions::{expect_fcu_not_syncing_or_accepted, validate_fcu_response, Action, Sequence},
5    BlockInfo, Environment,
6};
7use alloy_primitives::{Bytes, B256};
8use alloy_rpc_types_engine::{
9    payload::ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum,
10};
11use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction, TransactionRequest};
12use eyre::Result;
13use futures_util::future::BoxFuture;
14use reth_ethereum_primitives::TransactionSigned;
15use reth_node_api::{EngineTypes, PayloadTypes};
16use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
17use std::{collections::HashSet, marker::PhantomData, time::Duration};
18use tokio::time::sleep;
19use tracing::debug;
20
21/// Mine a single block with the given transactions and verify the block was created
22/// successfully.
23#[derive(Debug)]
24pub struct AssertMineBlock<Engine>
25where
26    Engine: PayloadTypes,
27{
28    /// The node index to mine
29    pub node_idx: usize,
30    /// Transactions to include in the block
31    pub transactions: Vec<Bytes>,
32    /// Expected block hash (optional)
33    pub expected_hash: Option<B256>,
34    /// Block's payload attributes
35    // TODO: refactor once we have actions to generate payload attributes.
36    pub payload_attributes: Engine::PayloadAttributes,
37    /// Tracks engine type
38    _phantom: PhantomData<Engine>,
39}
40
41impl<Engine> AssertMineBlock<Engine>
42where
43    Engine: PayloadTypes,
44{
45    /// Create a new `AssertMineBlock` action
46    pub fn new(
47        node_idx: usize,
48        transactions: Vec<Bytes>,
49        expected_hash: Option<B256>,
50        payload_attributes: Engine::PayloadAttributes,
51    ) -> Self {
52        Self {
53            node_idx,
54            transactions,
55            expected_hash,
56            payload_attributes,
57            _phantom: Default::default(),
58        }
59    }
60}
61
62impl<Engine> Action<Engine> for AssertMineBlock<Engine>
63where
64    Engine: EngineTypes,
65{
66    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
67        Box::pin(async move {
68            if self.node_idx >= env.node_clients.len() {
69                return Err(eyre::eyre!("Node index out of bounds: {}", self.node_idx));
70            }
71
72            let node_client = &env.node_clients[self.node_idx];
73            let rpc_client = &node_client.rpc;
74            let engine_client = node_client.engine.http_client();
75
76            // get the latest block to use as parent
77            let latest_block = EthApiClient::<
78                TransactionRequest,
79                Transaction,
80                Block,
81                Receipt,
82                Header,
83                TransactionSigned,
84            >::block_by_number(
85                rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
86            )
87            .await?;
88
89            let latest_block = latest_block.ok_or_else(|| eyre::eyre!("Latest block not found"))?;
90            let parent_hash = latest_block.header.hash;
91
92            debug!("Latest block hash: {parent_hash}");
93
94            // create a simple forkchoice state with the latest block as head
95            let fork_choice_state = ForkchoiceState {
96                head_block_hash: parent_hash,
97                safe_block_hash: parent_hash,
98                finalized_block_hash: parent_hash,
99            };
100
101            // Try v2 first for backwards compatibility, fall back to v3 on error.
102            match EngineApiClient::<Engine>::fork_choice_updated_v2(
103                &engine_client,
104                fork_choice_state,
105                Some(self.payload_attributes.clone()),
106            )
107            .await
108            {
109                Ok(fcu_result) => {
110                    debug!(?fcu_result, "FCU v2 result");
111                    match fcu_result.payload_status.status {
112                        PayloadStatusEnum::Valid => {
113                            if let Some(payload_id) = fcu_result.payload_id {
114                                debug!(id=%payload_id, "Got payload");
115                                let _engine_payload = EngineApiClient::<Engine>::get_payload_v2(
116                                    &engine_client,
117                                    payload_id,
118                                )
119                                .await?;
120                                Ok(())
121                            } else {
122                                Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
123                            }
124                        }
125                        _ => Err(eyre::eyre!(
126                            "Payload status not valid: {:?}",
127                            fcu_result.payload_status
128                        ))?,
129                    }
130                }
131                Err(_) => {
132                    // If v2 fails due to unsupported fork/missing fields, try v3
133                    let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
134                        &engine_client,
135                        fork_choice_state,
136                        Some(self.payload_attributes.clone()),
137                    )
138                    .await?;
139
140                    debug!(?fcu_result, "FCU v3 result");
141                    match fcu_result.payload_status.status {
142                        PayloadStatusEnum::Valid => {
143                            if let Some(payload_id) = fcu_result.payload_id {
144                                debug!(id=%payload_id, "Got payload");
145                                let _engine_payload = EngineApiClient::<Engine>::get_payload_v3(
146                                    &engine_client,
147                                    payload_id,
148                                )
149                                .await?;
150                                Ok(())
151                            } else {
152                                Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
153                            }
154                        }
155                        _ => Err(eyre::eyre!(
156                            "Payload status not valid: {:?}",
157                            fcu_result.payload_status
158                        )),
159                    }
160                }
161            }
162        })
163    }
164}
165
166/// Pick the next block producer based on the latest block information.
167#[derive(Debug, Default)]
168pub struct PickNextBlockProducer {}
169
170impl PickNextBlockProducer {
171    /// Create a new `PickNextBlockProducer` action
172    pub const fn new() -> Self {
173        Self {}
174    }
175}
176
177impl<Engine> Action<Engine> for PickNextBlockProducer
178where
179    Engine: EngineTypes,
180{
181    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
182        Box::pin(async move {
183            let num_clients = env.node_clients.len();
184            if num_clients == 0 {
185                return Err(eyre::eyre!("No node clients available"));
186            }
187
188            let latest_info = env
189                .current_block_info()
190                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
191
192            // simple round-robin selection based on next block number
193            let next_producer_idx = ((latest_info.number + 1) % num_clients as u64) as usize;
194
195            env.last_producer_idx = Some(next_producer_idx);
196            debug!(
197                "Selected node {} as the next block producer for block {}",
198                next_producer_idx,
199                latest_info.number + 1
200            );
201
202            Ok(())
203        })
204    }
205}
206
207/// Store payload attributes for the next block.
208#[derive(Debug, Default)]
209pub struct GeneratePayloadAttributes {}
210
211impl<Engine> Action<Engine> for GeneratePayloadAttributes
212where
213    Engine: EngineTypes + PayloadTypes,
214    Engine::PayloadAttributes: From<PayloadAttributes>,
215{
216    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
217        Box::pin(async move {
218            let latest_block = env
219                .current_block_info()
220                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
221            let block_number = latest_block.number;
222            let timestamp =
223                env.active_node_state()?.latest_header_time + env.block_timestamp_increment;
224            let payload_attributes = PayloadAttributes {
225                timestamp,
226                prev_randao: B256::random(),
227                suggested_fee_recipient: alloy_primitives::Address::random(),
228                withdrawals: Some(vec![]),
229                parent_beacon_block_root: Some(B256::ZERO),
230                slot_number: None,
231            };
232
233            env.active_node_state_mut()?
234                .payload_attributes
235                .insert(latest_block.number + 1, payload_attributes);
236            debug!("Stored payload attributes for block {}", block_number + 1);
237            Ok(())
238        })
239    }
240}
241
242/// Action that generates the next payload
243#[derive(Debug, Default)]
244pub struct GenerateNextPayload {}
245
246impl<Engine> Action<Engine> for GenerateNextPayload
247where
248    Engine: EngineTypes + PayloadTypes,
249    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
250{
251    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
252        Box::pin(async move {
253            let latest_block = env
254                .current_block_info()
255                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
256
257            let parent_hash = latest_block.hash;
258            debug!("Latest block hash: {parent_hash}");
259
260            let fork_choice_state = ForkchoiceState {
261                head_block_hash: parent_hash,
262                safe_block_hash: parent_hash,
263                finalized_block_hash: parent_hash,
264            };
265
266            let payload_attributes = env
267                .active_node_state()?
268                .payload_attributes
269                .get(&(latest_block.number + 1))
270                .cloned()
271                .ok_or_else(|| eyre::eyre!("No payload attributes found for next block"))?;
272
273            let producer_idx =
274                env.last_producer_idx.ok_or_else(|| eyre::eyre!("No block producer selected"))?;
275
276            let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
277                &env.node_clients[producer_idx].engine.http_client(),
278                fork_choice_state,
279                Some(payload_attributes.clone().into()),
280            )
281            .await?;
282
283            debug!("FCU result: {:?}", fcu_result);
284
285            // validate the FCU status before proceeding
286            // Note: In the context of GenerateNextPayload, Syncing usually means the engine
287            // doesn't have the requested head block, which should be an error
288            expect_fcu_not_syncing_or_accepted(&fcu_result, "GenerateNextPayload")?;
289
290            let payload_id = if let Some(payload_id) = fcu_result.payload_id {
291                debug!("Received new payload ID: {:?}", payload_id);
292                payload_id
293            } else {
294                debug!("No payload ID returned, generating fresh payload attributes for forking");
295
296                let fresh_payload_attributes = PayloadAttributes {
297                    timestamp: env.active_node_state()?.latest_header_time +
298                        env.block_timestamp_increment,
299                    prev_randao: B256::random(),
300                    suggested_fee_recipient: alloy_primitives::Address::random(),
301                    withdrawals: Some(vec![]),
302                    parent_beacon_block_root: Some(B256::ZERO),
303                    slot_number: None,
304                };
305
306                let fresh_fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
307                    &env.node_clients[producer_idx].engine.http_client(),
308                    fork_choice_state,
309                    Some(fresh_payload_attributes.clone().into()),
310                )
311                .await?;
312
313                debug!("Fresh FCU result: {:?}", fresh_fcu_result);
314
315                // validate the fresh FCU status
316                expect_fcu_not_syncing_or_accepted(
317                    &fresh_fcu_result,
318                    "GenerateNextPayload (fresh)",
319                )?;
320
321                if let Some(payload_id) = fresh_fcu_result.payload_id {
322                    payload_id
323                } else {
324                    debug!("Engine considers the fork base already canonical, skipping payload generation");
325                    return Ok(());
326                }
327            };
328
329            env.active_node_state_mut()?.next_payload_id = Some(payload_id);
330
331            sleep(Duration::from_secs(1)).await;
332
333            let built_payload_envelope = EngineApiClient::<Engine>::get_payload_v3(
334                &env.node_clients[producer_idx].engine.http_client(),
335                payload_id,
336            )
337            .await?;
338
339            // Store the payload attributes that were used to generate this payload
340            let built_payload = payload_attributes.clone();
341            env.active_node_state_mut()?
342                .payload_id_history
343                .insert(latest_block.number + 1, payload_id);
344            env.active_node_state_mut()?.latest_payload_built = Some(built_payload);
345            env.active_node_state_mut()?.latest_payload_envelope = Some(built_payload_envelope);
346
347            Ok(())
348        })
349    }
350}
351
352/// Action that broadcasts the latest fork choice state to all clients
353#[derive(Debug, Default)]
354pub struct BroadcastLatestForkchoice {}
355
356impl<Engine> Action<Engine> for BroadcastLatestForkchoice
357where
358    Engine: EngineTypes + PayloadTypes,
359    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
360    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
361{
362    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
363        Box::pin(async move {
364            if env.node_clients.is_empty() {
365                return Err(eyre::eyre!("No node clients available"));
366            }
367
368            // use the hash of the newly executed payload if available
369            let head_hash = if let Some(payload_envelope) =
370                &env.active_node_state()?.latest_payload_envelope
371            {
372                let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
373                    payload_envelope.clone().into();
374                let new_block_hash = execution_payload_envelope
375                    .execution_payload
376                    .payload_inner
377                    .payload_inner
378                    .block_hash;
379                debug!("Using newly executed block hash as head: {new_block_hash}");
380                new_block_hash
381            } else {
382                // fallback to RPC query
383                let rpc_client = &env.node_clients[0].rpc;
384                let current_head_block = EthApiClient::<
385                    TransactionRequest,
386                    Transaction,
387                    Block,
388                    Receipt,
389                    Header,
390                    TransactionSigned,
391                >::block_by_number(
392                    rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
393                )
394                .await?
395                .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
396                debug!("Using RPC latest block hash as head: {}", current_head_block.header.hash);
397                current_head_block.header.hash
398            };
399
400            let fork_choice_state = ForkchoiceState {
401                head_block_hash: head_hash,
402                safe_block_hash: head_hash,
403                finalized_block_hash: head_hash,
404            };
405            debug!(
406                "Broadcasting forkchoice update to {} clients. Head: {:?}",
407                env.node_clients.len(),
408                fork_choice_state.head_block_hash
409            );
410
411            for (idx, client) in env.node_clients.iter().enumerate() {
412                match EngineApiClient::<Engine>::fork_choice_updated_v3(
413                    &client.engine.http_client(),
414                    fork_choice_state,
415                    None,
416                )
417                .await
418                {
419                    Ok(resp) => {
420                        debug!(
421                            "Client {}: Forkchoice update status: {:?}",
422                            idx, resp.payload_status.status
423                        );
424                        // validate that the forkchoice update was accepted
425                        validate_fcu_response(&resp, &format!("Client {idx}"))?;
426                    }
427                    Err(err) => {
428                        return Err(eyre::eyre!(
429                            "Client {}: Failed to broadcast forkchoice: {:?}",
430                            idx,
431                            err
432                        ));
433                    }
434                }
435            }
436            debug!("Forkchoice update broadcasted successfully");
437            Ok(())
438        })
439    }
440}
441
442/// Action that syncs environment state with the node's canonical chain via RPC.
443///
444/// This queries the latest canonical block from the node and updates the environment
445/// to match. Typically used after forkchoice operations to ensure the environment
446/// is in sync with the node's view of the canonical chain.
447#[derive(Debug, Default)]
448pub struct UpdateBlockInfo {}
449
450impl<Engine> Action<Engine> for UpdateBlockInfo
451where
452    Engine: EngineTypes,
453{
454    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
455        Box::pin(async move {
456            // get the latest block from the first client to update environment state
457            let rpc_client = &env.node_clients[0].rpc;
458            let latest_block = EthApiClient::<
459                TransactionRequest,
460                Transaction,
461                Block,
462                Receipt,
463                Header,
464                TransactionSigned,
465            >::block_by_number(
466                rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
467            )
468            .await?
469            .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
470
471            // update environment with the new block information
472            env.set_current_block_info(BlockInfo {
473                hash: latest_block.header.hash,
474                number: latest_block.header.number,
475                timestamp: latest_block.header.timestamp,
476            })?;
477
478            env.active_node_state_mut()?.latest_header_time = latest_block.header.timestamp;
479            env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
480                latest_block.header.hash;
481
482            debug!(
483                "Updated environment to block {} (hash: {})",
484                latest_block.header.number, latest_block.header.hash
485            );
486
487            Ok(())
488        })
489    }
490}
491
492/// Action that updates environment state using the locally produced payload.
493///
494/// This uses the execution payload stored in the environment rather than querying RPC,
495/// making it more efficient and reliable during block production. Preferred over
496/// `UpdateBlockInfo` when we have just produced a block and have the payload available.
497#[derive(Debug, Default)]
498pub struct UpdateBlockInfoToLatestPayload {}
499
500impl<Engine> Action<Engine> for UpdateBlockInfoToLatestPayload
501where
502    Engine: EngineTypes + PayloadTypes,
503    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
504{
505    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
506        Box::pin(async move {
507            let payload_envelope = env
508                .active_node_state()?
509                .latest_payload_envelope
510                .as_ref()
511                .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?;
512
513            let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
514                payload_envelope.clone().into();
515            let execution_payload = execution_payload_envelope.execution_payload;
516
517            let block_hash = execution_payload.payload_inner.payload_inner.block_hash;
518            let block_number = execution_payload.payload_inner.payload_inner.block_number;
519            let block_timestamp = execution_payload.payload_inner.payload_inner.timestamp;
520
521            // update environment with the new block information from the payload
522            env.set_current_block_info(BlockInfo {
523                hash: block_hash,
524                number: block_number,
525                timestamp: block_timestamp,
526            })?;
527
528            env.active_node_state_mut()?.latest_header_time = block_timestamp;
529            env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash = block_hash;
530
531            debug!(
532                "Updated environment to newly produced block {} (hash: {})",
533                block_number, block_hash
534            );
535
536            Ok(())
537        })
538    }
539}
540
541/// Action that checks whether the broadcasted new payload has been accepted
542#[derive(Debug, Default)]
543pub struct CheckPayloadAccepted {}
544
545impl<Engine> Action<Engine> for CheckPayloadAccepted
546where
547    Engine: EngineTypes,
548    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
549{
550    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
551        Box::pin(async move {
552            let mut accepted_check: bool = false;
553
554            let latest_block = env
555                .current_block_info()
556                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
557
558            let payload_id = *env
559                .active_node_state()?
560                .payload_id_history
561                .get(&(latest_block.number + 1))
562                .ok_or_else(|| eyre::eyre!("Cannot find payload_id"))?;
563
564            let node_clients = env.node_clients.clone();
565            for (idx, client) in node_clients.iter().enumerate() {
566                let rpc_client = &client.rpc;
567
568                // get the last header by number using latest_head_number
569                let rpc_latest_header = EthApiClient::<
570                    TransactionRequest,
571                    Transaction,
572                    Block,
573                    Receipt,
574                    Header,
575                    TransactionSigned,
576                >::header_by_number(
577                    rpc_client, alloy_eips::BlockNumberOrTag::Latest
578                )
579                .await?
580                .ok_or_else(|| eyre::eyre!("No latest header found from rpc"))?;
581
582                // perform several checks
583                let next_new_payload = env
584                    .active_node_state()?
585                    .latest_payload_built
586                    .as_ref()
587                    .ok_or_else(|| eyre::eyre!("No next built payload found"))?;
588
589                let built_payload = EngineApiClient::<Engine>::get_payload_v3(
590                    &client.engine.http_client(),
591                    payload_id,
592                )
593                .await?;
594
595                let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = built_payload.into();
596                let new_payload_block_hash = execution_payload_envelope
597                    .execution_payload
598                    .payload_inner
599                    .payload_inner
600                    .block_hash;
601
602                if rpc_latest_header.hash != new_payload_block_hash {
603                    debug!(
604                        "Client {}: The hash is not matched: {:?} {:?}",
605                        idx, rpc_latest_header.hash, new_payload_block_hash
606                    );
607                    continue;
608                }
609
610                if rpc_latest_header.inner.difficulty != alloy_primitives::U256::ZERO {
611                    debug!(
612                        "Client {}: difficulty != 0: {:?}",
613                        idx, rpc_latest_header.inner.difficulty
614                    );
615                    continue;
616                }
617
618                if rpc_latest_header.inner.mix_hash != next_new_payload.prev_randao {
619                    debug!(
620                        "Client {}: The mix_hash and prev_randao is not same: {:?} {:?}",
621                        idx, rpc_latest_header.inner.mix_hash, next_new_payload.prev_randao
622                    );
623                    continue;
624                }
625
626                let extra_len = rpc_latest_header.inner.extra_data.len();
627                if extra_len <= 32 {
628                    debug!("Client {}: extra_len is fewer than 32. extra_len: {}", idx, extra_len);
629                    continue;
630                }
631
632                // at least one client passes all the check, save the header in Env
633                if !accepted_check {
634                    accepted_check = true;
635                    // save the current block info in Env
636                    env.set_current_block_info(BlockInfo {
637                        hash: rpc_latest_header.hash,
638                        number: rpc_latest_header.inner.number,
639                        timestamp: rpc_latest_header.inner.timestamp,
640                    })?;
641
642                    // align latest header time and forkchoice state with the accepted canonical
643                    // head
644                    env.active_node_state_mut()?.latest_header_time =
645                        rpc_latest_header.inner.timestamp;
646                    env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
647                        rpc_latest_header.hash;
648                }
649            }
650
651            if accepted_check {
652                Ok(())
653            } else {
654                Err(eyre::eyre!("No clients passed payload acceptance checks"))
655            }
656        })
657    }
658}
659
660/// Action that broadcasts the next new payload
661#[derive(Debug, Default)]
662pub struct BroadcastNextNewPayload {
663    /// If true, only send to the active node. If false, broadcast to all nodes.
664    active_node_only: bool,
665}
666
667impl BroadcastNextNewPayload {
668    /// Create a new `BroadcastNextNewPayload` action that only sends to the active node
669    pub const fn with_active_node() -> Self {
670        Self { active_node_only: true }
671    }
672}
673
674impl<Engine> Action<Engine> for BroadcastNextNewPayload
675where
676    Engine: EngineTypes + PayloadTypes,
677    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
678    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
679{
680    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
681        Box::pin(async move {
682            // Get the next new payload to broadcast
683            let next_new_payload = env
684                .active_node_state()?
685                .latest_payload_built
686                .as_ref()
687                .ok_or_else(|| eyre::eyre!("No next built payload found"))?
688                .clone();
689            let parent_beacon_block_root = next_new_payload
690                .parent_beacon_block_root
691                .ok_or_else(|| eyre::eyre!("No parent beacon block root for next new payload"))?;
692
693            let payload_envelope = env
694                .active_node_state()?
695                .latest_payload_envelope
696                .as_ref()
697                .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?
698                .clone();
699
700            let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = payload_envelope.into();
701            let execution_payload = execution_payload_envelope.execution_payload;
702
703            if self.active_node_only {
704                // Send only to the active node
705                let active_idx = env.active_node_idx;
706                let engine = env.node_clients[active_idx].engine.http_client();
707
708                let result = EngineApiClient::<Engine>::new_payload_v3(
709                    &engine,
710                    execution_payload.clone(),
711                    vec![],
712                    parent_beacon_block_root,
713                )
714                .await?;
715
716                debug!("Active node {}: new_payload status: {:?}", active_idx, result.status);
717
718                // Validate the response
719                match result.status {
720                    PayloadStatusEnum::Valid => {
721                        env.active_node_state_mut()?.latest_payload_executed =
722                            Some(next_new_payload);
723                        Ok(())
724                    }
725                    other => Err(eyre::eyre!(
726                        "Active node {}: Unexpected payload status: {:?}",
727                        active_idx,
728                        other
729                    )),
730                }
731            } else {
732                // Loop through all clients and broadcast the next new payload
733                let mut broadcast_results = Vec::new();
734                let mut first_valid_seen = false;
735
736                for (idx, client) in env.node_clients.iter().enumerate() {
737                    let engine = client.engine.http_client();
738
739                    // Broadcast the execution payload
740                    let result = EngineApiClient::<Engine>::new_payload_v3(
741                        &engine,
742                        execution_payload.clone(),
743                        vec![],
744                        parent_beacon_block_root,
745                    )
746                    .await?;
747
748                    broadcast_results.push((idx, result.status.clone()));
749                    debug!("Node {}: new_payload broadcast status: {:?}", idx, result.status);
750
751                    // Check if this node accepted the payload
752                    if result.status == PayloadStatusEnum::Valid && !first_valid_seen {
753                        first_valid_seen = true;
754                    } else if let PayloadStatusEnum::Invalid { validation_error } = result.status {
755                        debug!(
756                            "Node {}: Invalid payload status returned from broadcast: {:?}",
757                            idx, validation_error
758                        );
759                    }
760                }
761
762                // Update the executed payload state after broadcasting to all nodes
763                if first_valid_seen {
764                    env.active_node_state_mut()?.latest_payload_executed = Some(next_new_payload);
765                }
766
767                // Check if at least one node accepted the payload
768                let any_valid =
769                    broadcast_results.iter().any(|(_, status)| *status == PayloadStatusEnum::Valid);
770                if !any_valid {
771                    return Err(eyre::eyre!(
772                        "Failed to successfully broadcast payload to any client"
773                    ));
774                }
775
776                debug!("Broadcast complete. Results: {:?}", broadcast_results);
777
778                Ok(())
779            }
780        })
781    }
782}
783
784/// Action that produces a sequence of blocks using the available clients
785#[derive(Debug)]
786pub struct ProduceBlocks<Engine> {
787    /// Number of blocks to produce
788    pub num_blocks: u64,
789    /// Tracks engine type
790    _phantom: PhantomData<Engine>,
791}
792
793impl<Engine> ProduceBlocks<Engine> {
794    /// Create a new `ProduceBlocks` action
795    pub fn new(num_blocks: u64) -> Self {
796        Self { num_blocks, _phantom: Default::default() }
797    }
798}
799
800impl<Engine> Default for ProduceBlocks<Engine> {
801    fn default() -> Self {
802        Self::new(0)
803    }
804}
805
806impl<Engine> Action<Engine> for ProduceBlocks<Engine>
807where
808    Engine: EngineTypes + PayloadTypes,
809    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
810    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
811{
812    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
813        Box::pin(async move {
814            for _ in 0..self.num_blocks {
815                // create a fresh sequence for each block to avoid state pollution
816                // Note: This produces blocks but does NOT make them canonical
817                // Use MakeCanonical action explicitly if canonicalization is needed
818                let mut sequence = Sequence::new(vec![
819                    Box::new(PickNextBlockProducer::default()),
820                    Box::new(GeneratePayloadAttributes::default()),
821                    Box::new(GenerateNextPayload::default()),
822                    Box::new(BroadcastNextNewPayload::default()),
823                    Box::new(UpdateBlockInfoToLatestPayload::default()),
824                ]);
825                sequence.execute(env).await?;
826            }
827            Ok(())
828        })
829    }
830}
831
832/// Action to test forkchoice update to a tagged block with expected status
833#[derive(Debug)]
834pub struct TestFcuToTag {
835    /// Tag name of the target block
836    pub tag: String,
837    /// Expected payload status
838    pub expected_status: PayloadStatusEnum,
839}
840
841impl TestFcuToTag {
842    /// Create a new `TestFcuToTag` action
843    pub fn new(tag: impl Into<String>, expected_status: PayloadStatusEnum) -> Self {
844        Self { tag: tag.into(), expected_status }
845    }
846}
847
848impl<Engine> Action<Engine> for TestFcuToTag
849where
850    Engine: EngineTypes,
851{
852    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
853        Box::pin(async move {
854            // get the target block from the registry
855            let (target_block, _node_idx) = env
856                .block_registry
857                .get(&self.tag)
858                .copied()
859                .ok_or_else(|| eyre::eyre!("Block tag '{}' not found in registry", self.tag))?;
860
861            let engine_client = env.node_clients[0].engine.http_client();
862            let fcu_state = ForkchoiceState {
863                head_block_hash: target_block.hash,
864                safe_block_hash: target_block.hash,
865                finalized_block_hash: target_block.hash,
866            };
867
868            let fcu_response =
869                EngineApiClient::<Engine>::fork_choice_updated_v2(&engine_client, fcu_state, None)
870                    .await?;
871
872            // validate the response matches expected status
873            match (&fcu_response.payload_status.status, &self.expected_status) {
874                (PayloadStatusEnum::Valid, PayloadStatusEnum::Valid) => {
875                    debug!("FCU to '{}' returned VALID as expected", self.tag);
876                }
877                (PayloadStatusEnum::Invalid { .. }, PayloadStatusEnum::Invalid { .. }) => {
878                    debug!("FCU to '{}' returned INVALID as expected", self.tag);
879                }
880                (PayloadStatusEnum::Syncing, PayloadStatusEnum::Syncing) => {
881                    debug!("FCU to '{}' returned SYNCING as expected", self.tag);
882                }
883                (PayloadStatusEnum::Accepted, PayloadStatusEnum::Accepted) => {
884                    debug!("FCU to '{}' returned ACCEPTED as expected", self.tag);
885                }
886                (actual, expected) => {
887                    return Err(eyre::eyre!(
888                        "FCU to '{}': expected status {:?}, but got {:?}",
889                        self.tag,
890                        expected,
891                        actual
892                    ));
893                }
894            }
895
896            Ok(())
897        })
898    }
899}
900
901/// Action to expect a specific FCU status when targeting a tagged block
902#[derive(Debug)]
903pub struct ExpectFcuStatus {
904    /// Tag name of the target block
905    pub target_tag: String,
906    /// Expected payload status
907    pub expected_status: PayloadStatusEnum,
908}
909
910impl ExpectFcuStatus {
911    /// Create a new `ExpectFcuStatus` action expecting VALID status
912    pub fn valid(target_tag: impl Into<String>) -> Self {
913        Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Valid }
914    }
915
916    /// Create a new `ExpectFcuStatus` action expecting INVALID status
917    pub fn invalid(target_tag: impl Into<String>) -> Self {
918        Self {
919            target_tag: target_tag.into(),
920            expected_status: PayloadStatusEnum::Invalid {
921                validation_error: "corrupted block".to_string(),
922            },
923        }
924    }
925
926    /// Create a new `ExpectFcuStatus` action expecting SYNCING status
927    pub fn syncing(target_tag: impl Into<String>) -> Self {
928        Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Syncing }
929    }
930
931    /// Create a new `ExpectFcuStatus` action expecting ACCEPTED status
932    pub fn accepted(target_tag: impl Into<String>) -> Self {
933        Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Accepted }
934    }
935}
936
937impl<Engine> Action<Engine> for ExpectFcuStatus
938where
939    Engine: EngineTypes,
940{
941    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
942        Box::pin(async move {
943            let mut test_fcu = TestFcuToTag::new(&self.target_tag, self.expected_status.clone());
944            test_fcu.execute(env).await
945        })
946    }
947}
948
949/// Action to validate that a tagged block remains canonical by performing FCU to it
950#[derive(Debug)]
951pub struct ValidateCanonicalTag {
952    /// Tag name of the block to validate as canonical
953    pub tag: String,
954}
955
956impl ValidateCanonicalTag {
957    /// Create a new `ValidateCanonicalTag` action
958    pub fn new(tag: impl Into<String>) -> Self {
959        Self { tag: tag.into() }
960    }
961}
962
963impl<Engine> Action<Engine> for ValidateCanonicalTag
964where
965    Engine: EngineTypes,
966{
967    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
968        Box::pin(async move {
969            let mut expect_valid = ExpectFcuStatus::valid(&self.tag);
970            expect_valid.execute(env).await?;
971
972            debug!("Successfully validated that '{}' remains canonical", self.tag);
973            Ok(())
974        })
975    }
976}
977
978/// Action that produces blocks locally without broadcasting to other nodes
979/// This sends the payload only to the active node to ensure it's available locally
980#[derive(Debug)]
981pub struct ProduceBlocksLocally<Engine> {
982    /// Number of blocks to produce
983    pub num_blocks: u64,
984    /// Tracks engine type
985    _phantom: PhantomData<Engine>,
986}
987
988impl<Engine> ProduceBlocksLocally<Engine> {
989    /// Create a new `ProduceBlocksLocally` action
990    pub fn new(num_blocks: u64) -> Self {
991        Self { num_blocks, _phantom: Default::default() }
992    }
993}
994
995impl<Engine> Default for ProduceBlocksLocally<Engine> {
996    fn default() -> Self {
997        Self::new(0)
998    }
999}
1000
1001impl<Engine> Action<Engine> for ProduceBlocksLocally<Engine>
1002where
1003    Engine: EngineTypes + PayloadTypes,
1004    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
1005    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
1006{
1007    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
1008        Box::pin(async move {
1009            // Remember the active node to ensure all blocks are produced on the same node
1010            let producer_idx = env.active_node_idx;
1011
1012            for _ in 0..self.num_blocks {
1013                // Ensure we always use the same producer
1014                env.last_producer_idx = Some(producer_idx);
1015
1016                // create a sequence that produces blocks and sends only to active node
1017                let mut sequence = Sequence::new(vec![
1018                    // Skip PickNextBlockProducer to maintain the same producer
1019                    Box::new(GeneratePayloadAttributes::default()),
1020                    Box::new(GenerateNextPayload::default()),
1021                    // Send payload only to the active node to make it available
1022                    Box::new(BroadcastNextNewPayload::with_active_node()),
1023                    Box::new(UpdateBlockInfoToLatestPayload::default()),
1024                ]);
1025                sequence.execute(env).await?;
1026            }
1027            Ok(())
1028        })
1029    }
1030}
1031
1032/// Action that produces a sequence of blocks where some blocks are intentionally invalid
1033#[derive(Debug)]
1034pub struct ProduceInvalidBlocks<Engine> {
1035    /// Number of blocks to produce
1036    pub num_blocks: u64,
1037    /// Set of indices (0-based) where blocks should be made invalid
1038    pub invalid_indices: HashSet<u64>,
1039    /// Tracks engine type
1040    _phantom: PhantomData<Engine>,
1041}
1042
1043impl<Engine> ProduceInvalidBlocks<Engine> {
1044    /// Create a new `ProduceInvalidBlocks` action
1045    pub fn new(num_blocks: u64, invalid_indices: HashSet<u64>) -> Self {
1046        Self { num_blocks, invalid_indices, _phantom: Default::default() }
1047    }
1048
1049    /// Create a new `ProduceInvalidBlocks` action with a single invalid block at the specified
1050    /// index
1051    pub fn with_invalid_at(num_blocks: u64, invalid_index: u64) -> Self {
1052        let mut invalid_indices = HashSet::new();
1053        invalid_indices.insert(invalid_index);
1054        Self::new(num_blocks, invalid_indices)
1055    }
1056}
1057
1058impl<Engine> Action<Engine> for ProduceInvalidBlocks<Engine>
1059where
1060    Engine: EngineTypes + PayloadTypes,
1061    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
1062    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
1063{
1064    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
1065        Box::pin(async move {
1066            for block_index in 0..self.num_blocks {
1067                let is_invalid = self.invalid_indices.contains(&block_index);
1068
1069                if is_invalid {
1070                    debug!("Producing invalid block at index {}", block_index);
1071
1072                    // produce a valid block first, then corrupt it
1073                    let mut sequence = Sequence::new(vec![
1074                        Box::new(PickNextBlockProducer::default()),
1075                        Box::new(GeneratePayloadAttributes::default()),
1076                        Box::new(GenerateNextPayload::default()),
1077                    ]);
1078                    sequence.execute(env).await?;
1079
1080                    // get the latest payload and corrupt it
1081                    let latest_envelope =
1082                        env.active_node_state()?.latest_payload_envelope.as_ref().ok_or_else(
1083                            || eyre::eyre!("No payload envelope available to corrupt"),
1084                        )?;
1085
1086                    let envelope_v3: ExecutionPayloadEnvelopeV3 = latest_envelope.clone().into();
1087                    let mut corrupted_payload = envelope_v3.execution_payload;
1088
1089                    // corrupt the state root to make the block invalid
1090                    corrupted_payload.payload_inner.payload_inner.state_root = B256::random();
1091
1092                    debug!(
1093                        "Corrupted state root for block {} to: {}",
1094                        block_index, corrupted_payload.payload_inner.payload_inner.state_root
1095                    );
1096
1097                    // send the corrupted payload via newPayload
1098                    let engine_client = env.node_clients[0].engine.http_client();
1099                    // for simplicity, we'll use empty versioned hashes for invalid block testing
1100                    let versioned_hashes = Vec::new();
1101                    // use a random parent beacon block root since this is for invalid block testing
1102                    let parent_beacon_block_root = B256::random();
1103
1104                    let new_payload_response = EngineApiClient::<Engine>::new_payload_v3(
1105                        &engine_client,
1106                        corrupted_payload.clone(),
1107                        versioned_hashes,
1108                        parent_beacon_block_root,
1109                    )
1110                    .await?;
1111
1112                    // expect the payload to be rejected as invalid
1113                    match new_payload_response.status {
1114                        PayloadStatusEnum::Invalid { validation_error } => {
1115                            debug!(
1116                                "Block {} correctly rejected as invalid: {:?}",
1117                                block_index, validation_error
1118                            );
1119                        }
1120                        other_status => {
1121                            return Err(eyre::eyre!(
1122                                "Expected block {} to be rejected as INVALID, but got: {:?}",
1123                                block_index,
1124                                other_status
1125                            ));
1126                        }
1127                    }
1128
1129                    // update block info with the corrupted block (for potential future reference)
1130                    env.set_current_block_info(BlockInfo {
1131                        hash: corrupted_payload.payload_inner.payload_inner.block_hash,
1132                        number: corrupted_payload.payload_inner.payload_inner.block_number,
1133                        timestamp: corrupted_payload.timestamp(),
1134                    })?;
1135                } else {
1136                    debug!("Producing valid block at index {}", block_index);
1137
1138                    // produce a valid block normally
1139                    let mut sequence = Sequence::new(vec![
1140                        Box::new(PickNextBlockProducer::default()),
1141                        Box::new(GeneratePayloadAttributes::default()),
1142                        Box::new(GenerateNextPayload::default()),
1143                        Box::new(BroadcastNextNewPayload::default()),
1144                        Box::new(UpdateBlockInfoToLatestPayload::default()),
1145                    ]);
1146                    sequence.execute(env).await?;
1147                }
1148            }
1149            Ok(())
1150        })
1151    }
1152}