Skip to main content

reth_e2e_test_utils/testsuite/actions/
produce_blocks.rs

1//! Block production actions for the e2e testing framework.
2
3use crate::testsuite::{
4    actions::{expect_fcu_not_syncing_or_accepted, validate_fcu_response, Action, Sequence},
5    BlockInfo, Environment,
6};
7use alloy_primitives::{Bytes, B256};
8use alloy_rpc_types_engine::{
9    payload::ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum,
10};
11use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction, TransactionRequest};
12use eyre::Result;
13use futures_util::future::BoxFuture;
14use reth_ethereum_primitives::TransactionSigned;
15use reth_node_api::{EngineTypes, PayloadTypes};
16use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
17use std::{collections::HashSet, marker::PhantomData, time::Duration};
18use tokio::time::sleep;
19use tracing::debug;
20
21/// Mine a single block with the given transactions and verify the block was created
22/// successfully.
23#[derive(Debug)]
24pub struct AssertMineBlock<Engine>
25where
26    Engine: PayloadTypes,
27{
28    /// The node index to mine
29    pub node_idx: usize,
30    /// Transactions to include in the block
31    pub transactions: Vec<Bytes>,
32    /// Expected block hash (optional)
33    pub expected_hash: Option<B256>,
34    /// Block's payload attributes
35    // TODO: refactor once we have actions to generate payload attributes.
36    pub payload_attributes: Engine::PayloadAttributes,
37    /// Tracks engine type
38    _phantom: PhantomData<Engine>,
39}
40
41impl<Engine> AssertMineBlock<Engine>
42where
43    Engine: PayloadTypes,
44{
45    /// Create a new `AssertMineBlock` action
46    pub fn new(
47        node_idx: usize,
48        transactions: Vec<Bytes>,
49        expected_hash: Option<B256>,
50        payload_attributes: Engine::PayloadAttributes,
51    ) -> Self {
52        Self {
53            node_idx,
54            transactions,
55            expected_hash,
56            payload_attributes,
57            _phantom: Default::default(),
58        }
59    }
60}
61
62impl<Engine> Action<Engine> for AssertMineBlock<Engine>
63where
64    Engine: EngineTypes,
65{
66    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
67        Box::pin(async move {
68            if self.node_idx >= env.node_clients.len() {
69                return Err(eyre::eyre!("Node index out of bounds: {}", self.node_idx));
70            }
71
72            let node_client = &env.node_clients[self.node_idx];
73            let rpc_client = &node_client.rpc;
74            let engine_client = node_client.engine.http_client();
75
76            // get the latest block to use as parent
77            let latest_block = EthApiClient::<
78                TransactionRequest,
79                Transaction,
80                Block,
81                Receipt,
82                Header,
83                TransactionSigned,
84            >::block_by_number(
85                rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
86            )
87            .await?;
88
89            let latest_block = latest_block.ok_or_else(|| eyre::eyre!("Latest block not found"))?;
90            let parent_hash = latest_block.header.hash;
91
92            debug!("Latest block hash: {parent_hash}");
93
94            // create a simple forkchoice state with the latest block as head
95            let fork_choice_state = ForkchoiceState {
96                head_block_hash: parent_hash,
97                safe_block_hash: parent_hash,
98                finalized_block_hash: parent_hash,
99            };
100
101            // Try v2 first for backwards compatibility, fall back to v3 on error.
102            match EngineApiClient::<Engine>::fork_choice_updated_v2(
103                &engine_client,
104                fork_choice_state,
105                Some(self.payload_attributes.clone()),
106            )
107            .await
108            {
109                Ok(fcu_result) => {
110                    debug!(?fcu_result, "FCU v2 result");
111                    match fcu_result.payload_status.status {
112                        PayloadStatusEnum::Valid => {
113                            if let Some(payload_id) = fcu_result.payload_id {
114                                debug!(id=%payload_id, "Got payload");
115                                let _engine_payload = EngineApiClient::<Engine>::get_payload_v2(
116                                    &engine_client,
117                                    payload_id,
118                                )
119                                .await?;
120                                Ok(())
121                            } else {
122                                Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
123                            }
124                        }
125                        _ => Err(eyre::eyre!(
126                            "Payload status not valid: {:?}",
127                            fcu_result.payload_status
128                        ))?,
129                    }
130                }
131                Err(_) => {
132                    // If v2 fails due to unsupported fork/missing fields, try v3
133                    let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
134                        &engine_client,
135                        fork_choice_state,
136                        Some(self.payload_attributes.clone()),
137                    )
138                    .await?;
139
140                    debug!(?fcu_result, "FCU v3 result");
141                    match fcu_result.payload_status.status {
142                        PayloadStatusEnum::Valid => {
143                            if let Some(payload_id) = fcu_result.payload_id {
144                                debug!(id=%payload_id, "Got payload");
145                                let _engine_payload = EngineApiClient::<Engine>::get_payload_v3(
146                                    &engine_client,
147                                    payload_id,
148                                )
149                                .await?;
150                                Ok(())
151                            } else {
152                                Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
153                            }
154                        }
155                        _ => Err(eyre::eyre!(
156                            "Payload status not valid: {:?}",
157                            fcu_result.payload_status
158                        )),
159                    }
160                }
161            }
162        })
163    }
164}
165
166/// Pick the next block producer based on the latest block information.
167#[derive(Debug, Default)]
168pub struct PickNextBlockProducer {}
169
170impl PickNextBlockProducer {
171    /// Create a new `PickNextBlockProducer` action
172    pub const fn new() -> Self {
173        Self {}
174    }
175}
176
177impl<Engine> Action<Engine> for PickNextBlockProducer
178where
179    Engine: EngineTypes,
180{
181    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
182        Box::pin(async move {
183            let num_clients = env.node_clients.len();
184            if num_clients == 0 {
185                return Err(eyre::eyre!("No node clients available"));
186            }
187
188            let latest_info = env
189                .current_block_info()
190                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
191
192            // simple round-robin selection based on next block number
193            let next_producer_idx = ((latest_info.number + 1) % num_clients as u64) as usize;
194
195            env.last_producer_idx = Some(next_producer_idx);
196            debug!(
197                "Selected node {} as the next block producer for block {}",
198                next_producer_idx,
199                latest_info.number + 1
200            );
201
202            Ok(())
203        })
204    }
205}
206
207/// Store payload attributes for the next block.
208#[derive(Debug, Default)]
209pub struct GeneratePayloadAttributes {}
210
211impl<Engine> Action<Engine> for GeneratePayloadAttributes
212where
213    Engine: EngineTypes + PayloadTypes,
214    Engine::PayloadAttributes: From<PayloadAttributes>,
215{
216    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
217        Box::pin(async move {
218            let latest_block = env
219                .current_block_info()
220                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
221            let block_number = latest_block.number;
222            let timestamp =
223                env.active_node_state()?.latest_header_time + env.block_timestamp_increment;
224            let payload_attributes = PayloadAttributes {
225                timestamp,
226                prev_randao: B256::random(),
227                suggested_fee_recipient: alloy_primitives::Address::random(),
228                withdrawals: Some(vec![]),
229                parent_beacon_block_root: Some(B256::ZERO),
230                slot_number: None,
231                target_gas_limit: None,
232            };
233
234            env.active_node_state_mut()?
235                .payload_attributes
236                .insert(latest_block.number + 1, payload_attributes);
237            debug!("Stored payload attributes for block {}", block_number + 1);
238            Ok(())
239        })
240    }
241}
242
243/// Action that generates the next payload
244#[derive(Debug, Default)]
245pub struct GenerateNextPayload {}
246
247impl<Engine> Action<Engine> for GenerateNextPayload
248where
249    Engine: EngineTypes + PayloadTypes,
250    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
251{
252    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
253        Box::pin(async move {
254            let latest_block = env
255                .current_block_info()
256                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
257
258            let parent_hash = latest_block.hash;
259            debug!("Latest block hash: {parent_hash}");
260
261            let fork_choice_state = ForkchoiceState {
262                head_block_hash: parent_hash,
263                safe_block_hash: parent_hash,
264                finalized_block_hash: parent_hash,
265            };
266
267            let payload_attributes = env
268                .active_node_state()?
269                .payload_attributes
270                .get(&(latest_block.number + 1))
271                .cloned()
272                .ok_or_else(|| eyre::eyre!("No payload attributes found for next block"))?;
273
274            let producer_idx =
275                env.last_producer_idx.ok_or_else(|| eyre::eyre!("No block producer selected"))?;
276
277            let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
278                &env.node_clients[producer_idx].engine.http_client(),
279                fork_choice_state,
280                Some(payload_attributes.clone().into()),
281            )
282            .await?;
283
284            debug!("FCU result: {:?}", fcu_result);
285
286            // validate the FCU status before proceeding
287            // Note: In the context of GenerateNextPayload, Syncing usually means the engine
288            // doesn't have the requested head block, which should be an error
289            expect_fcu_not_syncing_or_accepted(&fcu_result, "GenerateNextPayload")?;
290
291            let payload_id = if let Some(payload_id) = fcu_result.payload_id {
292                debug!("Received new payload ID: {:?}", payload_id);
293                payload_id
294            } else {
295                debug!("No payload ID returned, generating fresh payload attributes for forking");
296
297                let fresh_payload_attributes = PayloadAttributes {
298                    timestamp: env.active_node_state()?.latest_header_time +
299                        env.block_timestamp_increment,
300                    prev_randao: B256::random(),
301                    suggested_fee_recipient: alloy_primitives::Address::random(),
302                    withdrawals: Some(vec![]),
303                    parent_beacon_block_root: Some(B256::ZERO),
304                    slot_number: None,
305                    target_gas_limit: None,
306                };
307
308                let fresh_fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
309                    &env.node_clients[producer_idx].engine.http_client(),
310                    fork_choice_state,
311                    Some(fresh_payload_attributes.clone().into()),
312                )
313                .await?;
314
315                debug!("Fresh FCU result: {:?}", fresh_fcu_result);
316
317                // validate the fresh FCU status
318                expect_fcu_not_syncing_or_accepted(
319                    &fresh_fcu_result,
320                    "GenerateNextPayload (fresh)",
321                )?;
322
323                if let Some(payload_id) = fresh_fcu_result.payload_id {
324                    payload_id
325                } else {
326                    debug!("Engine considers the fork base already canonical, skipping payload generation");
327                    return Ok(());
328                }
329            };
330
331            env.active_node_state_mut()?.next_payload_id = Some(payload_id);
332
333            sleep(Duration::from_secs(1)).await;
334
335            let built_payload_envelope = EngineApiClient::<Engine>::get_payload_v3(
336                &env.node_clients[producer_idx].engine.http_client(),
337                payload_id,
338            )
339            .await?;
340
341            // Store the payload attributes that were used to generate this payload
342            let built_payload = payload_attributes.clone();
343            env.active_node_state_mut()?
344                .payload_id_history
345                .insert(latest_block.number + 1, payload_id);
346            env.active_node_state_mut()?.latest_payload_built = Some(built_payload);
347            env.active_node_state_mut()?.latest_payload_envelope = Some(built_payload_envelope);
348
349            Ok(())
350        })
351    }
352}
353
354/// Action that broadcasts the latest fork choice state to all clients
355#[derive(Debug, Default)]
356pub struct BroadcastLatestForkchoice {}
357
358impl<Engine> Action<Engine> for BroadcastLatestForkchoice
359where
360    Engine: EngineTypes + PayloadTypes,
361    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
362    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
363{
364    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
365        Box::pin(async move {
366            if env.node_clients.is_empty() {
367                return Err(eyre::eyre!("No node clients available"));
368            }
369
370            // use the hash of the newly executed payload if available
371            let head_hash = if let Some(payload_envelope) =
372                &env.active_node_state()?.latest_payload_envelope
373            {
374                let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
375                    payload_envelope.clone().into();
376                let new_block_hash = execution_payload_envelope
377                    .execution_payload
378                    .payload_inner
379                    .payload_inner
380                    .block_hash;
381                debug!("Using newly executed block hash as head: {new_block_hash}");
382                new_block_hash
383            } else {
384                // fallback to RPC query
385                let rpc_client = &env.node_clients[0].rpc;
386                let current_head_block = EthApiClient::<
387                    TransactionRequest,
388                    Transaction,
389                    Block,
390                    Receipt,
391                    Header,
392                    TransactionSigned,
393                >::block_by_number(
394                    rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
395                )
396                .await?
397                .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
398                debug!("Using RPC latest block hash as head: {}", current_head_block.header.hash);
399                current_head_block.header.hash
400            };
401
402            let fork_choice_state = ForkchoiceState {
403                head_block_hash: head_hash,
404                safe_block_hash: head_hash,
405                finalized_block_hash: head_hash,
406            };
407            debug!(
408                "Broadcasting forkchoice update to {} clients. Head: {:?}",
409                env.node_clients.len(),
410                fork_choice_state.head_block_hash
411            );
412
413            for (idx, client) in env.node_clients.iter().enumerate() {
414                match EngineApiClient::<Engine>::fork_choice_updated_v3(
415                    &client.engine.http_client(),
416                    fork_choice_state,
417                    None,
418                )
419                .await
420                {
421                    Ok(resp) => {
422                        debug!(
423                            "Client {}: Forkchoice update status: {:?}",
424                            idx, resp.payload_status.status
425                        );
426                        // validate that the forkchoice update was accepted
427                        validate_fcu_response(&resp, &format!("Client {idx}"))?;
428                    }
429                    Err(err) => {
430                        return Err(eyre::eyre!(
431                            "Client {}: Failed to broadcast forkchoice: {:?}",
432                            idx,
433                            err
434                        ));
435                    }
436                }
437            }
438            debug!("Forkchoice update broadcasted successfully");
439            Ok(())
440        })
441    }
442}
443
444/// Action that syncs environment state with the node's canonical chain via RPC.
445///
446/// This queries the latest canonical block from the node and updates the environment
447/// to match. Typically used after forkchoice operations to ensure the environment
448/// is in sync with the node's view of the canonical chain.
449#[derive(Debug, Default)]
450pub struct UpdateBlockInfo {}
451
452impl<Engine> Action<Engine> for UpdateBlockInfo
453where
454    Engine: EngineTypes,
455{
456    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
457        Box::pin(async move {
458            // get the latest block from the first client to update environment state
459            let rpc_client = &env.node_clients[0].rpc;
460            let latest_block = EthApiClient::<
461                TransactionRequest,
462                Transaction,
463                Block,
464                Receipt,
465                Header,
466                TransactionSigned,
467            >::block_by_number(
468                rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
469            )
470            .await?
471            .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
472
473            // update environment with the new block information
474            env.set_current_block_info(BlockInfo {
475                hash: latest_block.header.hash,
476                number: latest_block.header.number,
477                timestamp: latest_block.header.timestamp,
478            })?;
479
480            env.active_node_state_mut()?.latest_header_time = latest_block.header.timestamp;
481            env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
482                latest_block.header.hash;
483
484            debug!(
485                "Updated environment to block {} (hash: {})",
486                latest_block.header.number, latest_block.header.hash
487            );
488
489            Ok(())
490        })
491    }
492}
493
494/// Action that updates environment state using the locally produced payload.
495///
496/// This uses the execution payload stored in the environment rather than querying RPC,
497/// making it more efficient and reliable during block production. Preferred over
498/// `UpdateBlockInfo` when we have just produced a block and have the payload available.
499#[derive(Debug, Default)]
500pub struct UpdateBlockInfoToLatestPayload {}
501
502impl<Engine> Action<Engine> for UpdateBlockInfoToLatestPayload
503where
504    Engine: EngineTypes + PayloadTypes,
505    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
506{
507    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
508        Box::pin(async move {
509            let payload_envelope = env
510                .active_node_state()?
511                .latest_payload_envelope
512                .as_ref()
513                .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?;
514
515            let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
516                payload_envelope.clone().into();
517            let execution_payload = execution_payload_envelope.execution_payload;
518
519            let block_hash = execution_payload.payload_inner.payload_inner.block_hash;
520            let block_number = execution_payload.payload_inner.payload_inner.block_number;
521            let block_timestamp = execution_payload.payload_inner.payload_inner.timestamp;
522
523            // update environment with the new block information from the payload
524            env.set_current_block_info(BlockInfo {
525                hash: block_hash,
526                number: block_number,
527                timestamp: block_timestamp,
528            })?;
529
530            env.active_node_state_mut()?.latest_header_time = block_timestamp;
531            env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash = block_hash;
532
533            debug!(
534                "Updated environment to newly produced block {} (hash: {})",
535                block_number, block_hash
536            );
537
538            Ok(())
539        })
540    }
541}
542
543/// Action that checks whether the broadcasted new payload has been accepted
544#[derive(Debug, Default)]
545pub struct CheckPayloadAccepted {}
546
547impl<Engine> Action<Engine> for CheckPayloadAccepted
548where
549    Engine: EngineTypes,
550    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
551{
552    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
553        Box::pin(async move {
554            let mut accepted_check: bool = false;
555
556            let latest_block = env
557                .current_block_info()
558                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
559
560            let payload_id = *env
561                .active_node_state()?
562                .payload_id_history
563                .get(&(latest_block.number + 1))
564                .ok_or_else(|| eyre::eyre!("Cannot find payload_id"))?;
565
566            let node_clients = env.node_clients.clone();
567            for (idx, client) in node_clients.iter().enumerate() {
568                let rpc_client = &client.rpc;
569
570                // get the last header by number using latest_head_number
571                let rpc_latest_header = EthApiClient::<
572                    TransactionRequest,
573                    Transaction,
574                    Block,
575                    Receipt,
576                    Header,
577                    TransactionSigned,
578                >::header_by_number(
579                    rpc_client, alloy_eips::BlockNumberOrTag::Latest
580                )
581                .await?
582                .ok_or_else(|| eyre::eyre!("No latest header found from rpc"))?;
583
584                // perform several checks
585                let next_new_payload = env
586                    .active_node_state()?
587                    .latest_payload_built
588                    .as_ref()
589                    .ok_or_else(|| eyre::eyre!("No next built payload found"))?;
590
591                let built_payload = EngineApiClient::<Engine>::get_payload_v3(
592                    &client.engine.http_client(),
593                    payload_id,
594                )
595                .await?;
596
597                let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = built_payload.into();
598                let new_payload_block_hash = execution_payload_envelope
599                    .execution_payload
600                    .payload_inner
601                    .payload_inner
602                    .block_hash;
603
604                if rpc_latest_header.hash != new_payload_block_hash {
605                    debug!(
606                        "Client {}: The hash is not matched: {:?} {:?}",
607                        idx, rpc_latest_header.hash, new_payload_block_hash
608                    );
609                    continue;
610                }
611
612                if rpc_latest_header.inner.difficulty != alloy_primitives::U256::ZERO {
613                    debug!(
614                        "Client {}: difficulty != 0: {:?}",
615                        idx, rpc_latest_header.inner.difficulty
616                    );
617                    continue;
618                }
619
620                if rpc_latest_header.inner.mix_hash != next_new_payload.prev_randao {
621                    debug!(
622                        "Client {}: The mix_hash and prev_randao is not same: {:?} {:?}",
623                        idx, rpc_latest_header.inner.mix_hash, next_new_payload.prev_randao
624                    );
625                    continue;
626                }
627
628                let extra_len = rpc_latest_header.inner.extra_data.len();
629                if extra_len <= 32 {
630                    debug!("Client {}: extra_len is fewer than 32. extra_len: {}", idx, extra_len);
631                    continue;
632                }
633
634                // at least one client passes all the check, save the header in Env
635                if !accepted_check {
636                    accepted_check = true;
637                    // save the current block info in Env
638                    env.set_current_block_info(BlockInfo {
639                        hash: rpc_latest_header.hash,
640                        number: rpc_latest_header.inner.number,
641                        timestamp: rpc_latest_header.inner.timestamp,
642                    })?;
643
644                    // align latest header time and forkchoice state with the accepted canonical
645                    // head
646                    env.active_node_state_mut()?.latest_header_time =
647                        rpc_latest_header.inner.timestamp;
648                    env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
649                        rpc_latest_header.hash;
650                }
651            }
652
653            if accepted_check {
654                Ok(())
655            } else {
656                Err(eyre::eyre!("No clients passed payload acceptance checks"))
657            }
658        })
659    }
660}
661
662/// Action that broadcasts the next new payload
663#[derive(Debug, Default)]
664pub struct BroadcastNextNewPayload {
665    /// If true, only send to the active node. If false, broadcast to all nodes.
666    active_node_only: bool,
667}
668
669impl BroadcastNextNewPayload {
670    /// Create a new `BroadcastNextNewPayload` action that only sends to the active node
671    pub const fn with_active_node() -> Self {
672        Self { active_node_only: true }
673    }
674}
675
676impl<Engine> Action<Engine> for BroadcastNextNewPayload
677where
678    Engine: EngineTypes + PayloadTypes,
679    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
680    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
681{
682    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
683        Box::pin(async move {
684            // Get the next new payload to broadcast
685            let next_new_payload = env
686                .active_node_state()?
687                .latest_payload_built
688                .as_ref()
689                .ok_or_else(|| eyre::eyre!("No next built payload found"))?
690                .clone();
691            let parent_beacon_block_root = next_new_payload
692                .parent_beacon_block_root
693                .ok_or_else(|| eyre::eyre!("No parent beacon block root for next new payload"))?;
694
695            let payload_envelope = env
696                .active_node_state()?
697                .latest_payload_envelope
698                .as_ref()
699                .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?
700                .clone();
701
702            let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = payload_envelope.into();
703            let execution_payload = execution_payload_envelope.execution_payload;
704
705            if self.active_node_only {
706                // Send only to the active node
707                let active_idx = env.active_node_idx;
708                let engine = env.node_clients[active_idx].engine.http_client();
709
710                let result = EngineApiClient::<Engine>::new_payload_v3(
711                    &engine,
712                    execution_payload.clone(),
713                    vec![],
714                    parent_beacon_block_root,
715                )
716                .await?;
717
718                debug!("Active node {}: new_payload status: {:?}", active_idx, result.status);
719
720                // Validate the response
721                match result.status {
722                    PayloadStatusEnum::Valid => {
723                        env.active_node_state_mut()?.latest_payload_executed =
724                            Some(next_new_payload);
725                        Ok(())
726                    }
727                    other => Err(eyre::eyre!(
728                        "Active node {}: Unexpected payload status: {:?}",
729                        active_idx,
730                        other
731                    )),
732                }
733            } else {
734                // Loop through all clients and broadcast the next new payload
735                let mut broadcast_results = Vec::new();
736                let mut first_valid_seen = false;
737
738                for (idx, client) in env.node_clients.iter().enumerate() {
739                    let engine = client.engine.http_client();
740
741                    // Broadcast the execution payload
742                    let result = EngineApiClient::<Engine>::new_payload_v3(
743                        &engine,
744                        execution_payload.clone(),
745                        vec![],
746                        parent_beacon_block_root,
747                    )
748                    .await?;
749
750                    broadcast_results.push((idx, result.status.clone()));
751                    debug!("Node {}: new_payload broadcast status: {:?}", idx, result.status);
752
753                    // Check if this node accepted the payload
754                    if result.status == PayloadStatusEnum::Valid && !first_valid_seen {
755                        first_valid_seen = true;
756                    } else if let PayloadStatusEnum::Invalid { validation_error } = result.status {
757                        debug!(
758                            "Node {}: Invalid payload status returned from broadcast: {:?}",
759                            idx, validation_error
760                        );
761                    }
762                }
763
764                // Update the executed payload state after broadcasting to all nodes
765                if first_valid_seen {
766                    env.active_node_state_mut()?.latest_payload_executed = Some(next_new_payload);
767                }
768
769                // Check if at least one node accepted the payload
770                let any_valid =
771                    broadcast_results.iter().any(|(_, status)| *status == PayloadStatusEnum::Valid);
772                if !any_valid {
773                    return Err(eyre::eyre!(
774                        "Failed to successfully broadcast payload to any client"
775                    ));
776                }
777
778                debug!("Broadcast complete. Results: {:?}", broadcast_results);
779
780                Ok(())
781            }
782        })
783    }
784}
785
786/// Action that produces a sequence of blocks using the available clients
787#[derive(Debug)]
788pub struct ProduceBlocks<Engine> {
789    /// Number of blocks to produce
790    pub num_blocks: u64,
791    /// Tracks engine type
792    _phantom: PhantomData<Engine>,
793}
794
795impl<Engine> ProduceBlocks<Engine> {
796    /// Create a new `ProduceBlocks` action
797    pub fn new(num_blocks: u64) -> Self {
798        Self { num_blocks, _phantom: Default::default() }
799    }
800}
801
802impl<Engine> Default for ProduceBlocks<Engine> {
803    fn default() -> Self {
804        Self::new(0)
805    }
806}
807
808impl<Engine> Action<Engine> for ProduceBlocks<Engine>
809where
810    Engine: EngineTypes + PayloadTypes,
811    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
812    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
813{
814    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
815        Box::pin(async move {
816            for _ in 0..self.num_blocks {
817                // create a fresh sequence for each block to avoid state pollution
818                // Note: This produces blocks but does NOT make them canonical
819                // Use MakeCanonical action explicitly if canonicalization is needed
820                let mut sequence = Sequence::new(vec![
821                    Box::new(PickNextBlockProducer::default()),
822                    Box::new(GeneratePayloadAttributes::default()),
823                    Box::new(GenerateNextPayload::default()),
824                    Box::new(BroadcastNextNewPayload::default()),
825                    Box::new(UpdateBlockInfoToLatestPayload::default()),
826                ]);
827                sequence.execute(env).await?;
828            }
829            Ok(())
830        })
831    }
832}
833
834/// Action to test forkchoice update to a tagged block with expected status
835#[derive(Debug)]
836pub struct TestFcuToTag {
837    /// Tag name of the target block
838    pub tag: String,
839    /// Expected payload status
840    pub expected_status: PayloadStatusEnum,
841}
842
843impl TestFcuToTag {
844    /// Create a new `TestFcuToTag` action
845    pub fn new(tag: impl Into<String>, expected_status: PayloadStatusEnum) -> Self {
846        Self { tag: tag.into(), expected_status }
847    }
848}
849
850impl<Engine> Action<Engine> for TestFcuToTag
851where
852    Engine: EngineTypes,
853{
854    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
855        Box::pin(async move {
856            // get the target block from the registry
857            let (target_block, _node_idx) = env
858                .block_registry
859                .get(&self.tag)
860                .copied()
861                .ok_or_else(|| eyre::eyre!("Block tag '{}' not found in registry", self.tag))?;
862
863            let engine_client = env.node_clients[0].engine.http_client();
864            let fcu_state = ForkchoiceState {
865                head_block_hash: target_block.hash,
866                safe_block_hash: target_block.hash,
867                finalized_block_hash: target_block.hash,
868            };
869
870            let fcu_response =
871                EngineApiClient::<Engine>::fork_choice_updated_v2(&engine_client, fcu_state, None)
872                    .await?;
873
874            // validate the response matches expected status
875            match (&fcu_response.payload_status.status, &self.expected_status) {
876                (PayloadStatusEnum::Valid, PayloadStatusEnum::Valid) => {
877                    debug!("FCU to '{}' returned VALID as expected", self.tag);
878                }
879                (PayloadStatusEnum::Invalid { .. }, PayloadStatusEnum::Invalid { .. }) => {
880                    debug!("FCU to '{}' returned INVALID as expected", self.tag);
881                }
882                (PayloadStatusEnum::Syncing, PayloadStatusEnum::Syncing) => {
883                    debug!("FCU to '{}' returned SYNCING as expected", self.tag);
884                }
885                (PayloadStatusEnum::Accepted, PayloadStatusEnum::Accepted) => {
886                    debug!("FCU to '{}' returned ACCEPTED as expected", self.tag);
887                }
888                (actual, expected) => {
889                    return Err(eyre::eyre!(
890                        "FCU to '{}': expected status {:?}, but got {:?}",
891                        self.tag,
892                        expected,
893                        actual
894                    ));
895                }
896            }
897
898            Ok(())
899        })
900    }
901}
902
903/// Action to expect a specific FCU status when targeting a tagged block
904#[derive(Debug)]
905pub struct ExpectFcuStatus {
906    /// Tag name of the target block
907    pub target_tag: String,
908    /// Expected payload status
909    pub expected_status: PayloadStatusEnum,
910}
911
912impl ExpectFcuStatus {
913    /// Create a new `ExpectFcuStatus` action expecting VALID status
914    pub fn valid(target_tag: impl Into<String>) -> Self {
915        Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Valid }
916    }
917
918    /// Create a new `ExpectFcuStatus` action expecting INVALID status
919    pub fn invalid(target_tag: impl Into<String>) -> Self {
920        Self {
921            target_tag: target_tag.into(),
922            expected_status: PayloadStatusEnum::Invalid {
923                validation_error: "corrupted block".to_string(),
924            },
925        }
926    }
927
928    /// Create a new `ExpectFcuStatus` action expecting SYNCING status
929    pub fn syncing(target_tag: impl Into<String>) -> Self {
930        Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Syncing }
931    }
932
933    /// Create a new `ExpectFcuStatus` action expecting ACCEPTED status
934    pub fn accepted(target_tag: impl Into<String>) -> Self {
935        Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Accepted }
936    }
937}
938
939impl<Engine> Action<Engine> for ExpectFcuStatus
940where
941    Engine: EngineTypes,
942{
943    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
944        Box::pin(async move {
945            let mut test_fcu = TestFcuToTag::new(&self.target_tag, self.expected_status.clone());
946            test_fcu.execute(env).await
947        })
948    }
949}
950
951/// Action to validate that a tagged block remains canonical by performing FCU to it
952#[derive(Debug)]
953pub struct ValidateCanonicalTag {
954    /// Tag name of the block to validate as canonical
955    pub tag: String,
956}
957
958impl ValidateCanonicalTag {
959    /// Create a new `ValidateCanonicalTag` action
960    pub fn new(tag: impl Into<String>) -> Self {
961        Self { tag: tag.into() }
962    }
963}
964
965impl<Engine> Action<Engine> for ValidateCanonicalTag
966where
967    Engine: EngineTypes,
968{
969    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
970        Box::pin(async move {
971            let mut expect_valid = ExpectFcuStatus::valid(&self.tag);
972            expect_valid.execute(env).await?;
973
974            debug!("Successfully validated that '{}' remains canonical", self.tag);
975            Ok(())
976        })
977    }
978}
979
980/// Action that produces blocks locally without broadcasting to other nodes
981/// This sends the payload only to the active node to ensure it's available locally
982#[derive(Debug)]
983pub struct ProduceBlocksLocally<Engine> {
984    /// Number of blocks to produce
985    pub num_blocks: u64,
986    /// Tracks engine type
987    _phantom: PhantomData<Engine>,
988}
989
990impl<Engine> ProduceBlocksLocally<Engine> {
991    /// Create a new `ProduceBlocksLocally` action
992    pub fn new(num_blocks: u64) -> Self {
993        Self { num_blocks, _phantom: Default::default() }
994    }
995}
996
997impl<Engine> Default for ProduceBlocksLocally<Engine> {
998    fn default() -> Self {
999        Self::new(0)
1000    }
1001}
1002
1003impl<Engine> Action<Engine> for ProduceBlocksLocally<Engine>
1004where
1005    Engine: EngineTypes + PayloadTypes,
1006    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
1007    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
1008{
1009    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
1010        Box::pin(async move {
1011            // Remember the active node to ensure all blocks are produced on the same node
1012            let producer_idx = env.active_node_idx;
1013
1014            for _ in 0..self.num_blocks {
1015                // Ensure we always use the same producer
1016                env.last_producer_idx = Some(producer_idx);
1017
1018                // create a sequence that produces blocks and sends only to active node
1019                let mut sequence = Sequence::new(vec![
1020                    // Skip PickNextBlockProducer to maintain the same producer
1021                    Box::new(GeneratePayloadAttributes::default()),
1022                    Box::new(GenerateNextPayload::default()),
1023                    // Send payload only to the active node to make it available
1024                    Box::new(BroadcastNextNewPayload::with_active_node()),
1025                    Box::new(UpdateBlockInfoToLatestPayload::default()),
1026                ]);
1027                sequence.execute(env).await?;
1028            }
1029            Ok(())
1030        })
1031    }
1032}
1033
1034/// Action that produces a sequence of blocks where some blocks are intentionally invalid
1035#[derive(Debug)]
1036pub struct ProduceInvalidBlocks<Engine> {
1037    /// Number of blocks to produce
1038    pub num_blocks: u64,
1039    /// Set of indices (0-based) where blocks should be made invalid
1040    pub invalid_indices: HashSet<u64>,
1041    /// Tracks engine type
1042    _phantom: PhantomData<Engine>,
1043}
1044
1045impl<Engine> ProduceInvalidBlocks<Engine> {
1046    /// Create a new `ProduceInvalidBlocks` action
1047    pub fn new(num_blocks: u64, invalid_indices: HashSet<u64>) -> Self {
1048        Self { num_blocks, invalid_indices, _phantom: Default::default() }
1049    }
1050
1051    /// Create a new `ProduceInvalidBlocks` action with a single invalid block at the specified
1052    /// index
1053    pub fn with_invalid_at(num_blocks: u64, invalid_index: u64) -> Self {
1054        let mut invalid_indices = HashSet::new();
1055        invalid_indices.insert(invalid_index);
1056        Self::new(num_blocks, invalid_indices)
1057    }
1058}
1059
1060impl<Engine> Action<Engine> for ProduceInvalidBlocks<Engine>
1061where
1062    Engine: EngineTypes + PayloadTypes,
1063    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
1064    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
1065{
1066    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
1067        Box::pin(async move {
1068            for block_index in 0..self.num_blocks {
1069                let is_invalid = self.invalid_indices.contains(&block_index);
1070
1071                if is_invalid {
1072                    debug!("Producing invalid block at index {}", block_index);
1073
1074                    // produce a valid block first, then corrupt it
1075                    let mut sequence = Sequence::new(vec![
1076                        Box::new(PickNextBlockProducer::default()),
1077                        Box::new(GeneratePayloadAttributes::default()),
1078                        Box::new(GenerateNextPayload::default()),
1079                    ]);
1080                    sequence.execute(env).await?;
1081
1082                    // get the latest payload and corrupt it
1083                    let latest_envelope =
1084                        env.active_node_state()?.latest_payload_envelope.as_ref().ok_or_else(
1085                            || eyre::eyre!("No payload envelope available to corrupt"),
1086                        )?;
1087
1088                    let envelope_v3: ExecutionPayloadEnvelopeV3 = latest_envelope.clone().into();
1089                    let mut corrupted_payload = envelope_v3.execution_payload;
1090
1091                    // corrupt the state root to make the block invalid
1092                    corrupted_payload.payload_inner.payload_inner.state_root = B256::random();
1093
1094                    debug!(
1095                        "Corrupted state root for block {} to: {}",
1096                        block_index, corrupted_payload.payload_inner.payload_inner.state_root
1097                    );
1098
1099                    // send the corrupted payload via newPayload
1100                    let engine_client = env.node_clients[0].engine.http_client();
1101                    // for simplicity, we'll use empty versioned hashes for invalid block testing
1102                    let versioned_hashes = Vec::new();
1103                    // use a random parent beacon block root since this is for invalid block testing
1104                    let parent_beacon_block_root = B256::random();
1105
1106                    let new_payload_response = EngineApiClient::<Engine>::new_payload_v3(
1107                        &engine_client,
1108                        corrupted_payload.clone(),
1109                        versioned_hashes,
1110                        parent_beacon_block_root,
1111                    )
1112                    .await?;
1113
1114                    // expect the payload to be rejected as invalid
1115                    match new_payload_response.status {
1116                        PayloadStatusEnum::Invalid { validation_error } => {
1117                            debug!(
1118                                "Block {} correctly rejected as invalid: {:?}",
1119                                block_index, validation_error
1120                            );
1121                        }
1122                        other_status => {
1123                            return Err(eyre::eyre!(
1124                                "Expected block {} to be rejected as INVALID, but got: {:?}",
1125                                block_index,
1126                                other_status
1127                            ));
1128                        }
1129                    }
1130
1131                    // update block info with the corrupted block (for potential future reference)
1132                    env.set_current_block_info(BlockInfo {
1133                        hash: corrupted_payload.payload_inner.payload_inner.block_hash,
1134                        number: corrupted_payload.payload_inner.payload_inner.block_number,
1135                        timestamp: corrupted_payload.timestamp(),
1136                    })?;
1137                } else {
1138                    debug!("Producing valid block at index {}", block_index);
1139
1140                    // produce a valid block normally
1141                    let mut sequence = Sequence::new(vec![
1142                        Box::new(PickNextBlockProducer::default()),
1143                        Box::new(GeneratePayloadAttributes::default()),
1144                        Box::new(GenerateNextPayload::default()),
1145                        Box::new(BroadcastNextNewPayload::default()),
1146                        Box::new(UpdateBlockInfoToLatestPayload::default()),
1147                    ]);
1148                    sequence.execute(env).await?;
1149                }
1150            }
1151            Ok(())
1152        })
1153    }
1154}