reth_e2e_test_utils/testsuite/actions/
produce_blocks.rs

1//! Block production actions for the e2e testing framework.
2
3use crate::testsuite::{
4    actions::{expect_fcu_not_syncing_or_accepted, validate_fcu_response, Action, Sequence},
5    BlockInfo, Environment,
6};
7use alloy_primitives::{Bytes, B256};
8use alloy_rpc_types_engine::{
9    payload::ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum,
10};
11use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction, TransactionRequest};
12use eyre::Result;
13use futures_util::future::BoxFuture;
14use reth_node_api::{EngineTypes, PayloadTypes};
15use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
16use std::{collections::HashSet, marker::PhantomData, time::Duration};
17use tokio::time::sleep;
18use tracing::debug;
19
20/// Mine a single block with the given transactions and verify the block was created
21/// successfully.
22#[derive(Debug)]
23pub struct AssertMineBlock<Engine>
24where
25    Engine: PayloadTypes,
26{
27    /// The node index to mine
28    pub node_idx: usize,
29    /// Transactions to include in the block
30    pub transactions: Vec<Bytes>,
31    /// Expected block hash (optional)
32    pub expected_hash: Option<B256>,
33    /// Block's payload attributes
34    // TODO: refactor once we have actions to generate payload attributes.
35    pub payload_attributes: Engine::PayloadAttributes,
36    /// Tracks engine type
37    _phantom: PhantomData<Engine>,
38}
39
40impl<Engine> AssertMineBlock<Engine>
41where
42    Engine: PayloadTypes,
43{
44    /// Create a new `AssertMineBlock` action
45    pub fn new(
46        node_idx: usize,
47        transactions: Vec<Bytes>,
48        expected_hash: Option<B256>,
49        payload_attributes: Engine::PayloadAttributes,
50    ) -> Self {
51        Self {
52            node_idx,
53            transactions,
54            expected_hash,
55            payload_attributes,
56            _phantom: Default::default(),
57        }
58    }
59}
60
61impl<Engine> Action<Engine> for AssertMineBlock<Engine>
62where
63    Engine: EngineTypes,
64{
65    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
66        Box::pin(async move {
67            if self.node_idx >= env.node_clients.len() {
68                return Err(eyre::eyre!("Node index out of bounds: {}", self.node_idx));
69            }
70
71            let node_client = &env.node_clients[self.node_idx];
72            let rpc_client = &node_client.rpc;
73            let engine_client = node_client.engine.http_client();
74
75            // get the latest block to use as parent
76            let latest_block = EthApiClient::<
77                TransactionRequest,
78                Transaction,
79                Block,
80                Receipt,
81                Header,
82            >::block_by_number(
83                rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
84            )
85            .await?;
86
87            let latest_block = latest_block.ok_or_else(|| eyre::eyre!("Latest block not found"))?;
88            let parent_hash = latest_block.header.hash;
89
90            debug!("Latest block hash: {parent_hash}");
91
92            // create a simple forkchoice state with the latest block as head
93            let fork_choice_state = ForkchoiceState {
94                head_block_hash: parent_hash,
95                safe_block_hash: parent_hash,
96                finalized_block_hash: parent_hash,
97            };
98
99            let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v2(
100                &engine_client,
101                fork_choice_state,
102                Some(self.payload_attributes.clone()),
103            )
104            .await?;
105
106            debug!("FCU result: {:?}", fcu_result);
107
108            // check if we got a valid payload ID
109            match fcu_result.payload_status.status {
110                PayloadStatusEnum::Valid => {
111                    if let Some(payload_id) = fcu_result.payload_id {
112                        debug!("Got payload ID: {payload_id}");
113
114                        // get the payload that was built
115                        let _engine_payload =
116                            EngineApiClient::<Engine>::get_payload_v2(&engine_client, payload_id)
117                                .await?;
118                        Ok(())
119                    } else {
120                        Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
121                    }
122                }
123                _ => Err(eyre::eyre!("Payload status not valid: {:?}", fcu_result.payload_status)),
124            }
125        })
126    }
127}
128
129/// Pick the next block producer based on the latest block information.
130#[derive(Debug, Default)]
131pub struct PickNextBlockProducer {}
132
133impl PickNextBlockProducer {
134    /// Create a new `PickNextBlockProducer` action
135    pub const fn new() -> Self {
136        Self {}
137    }
138}
139
140impl<Engine> Action<Engine> for PickNextBlockProducer
141where
142    Engine: EngineTypes,
143{
144    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
145        Box::pin(async move {
146            let num_clients = env.node_clients.len();
147            if num_clients == 0 {
148                return Err(eyre::eyre!("No node clients available"));
149            }
150
151            let latest_info = env
152                .current_block_info()
153                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
154
155            // simple round-robin selection based on next block number
156            let next_producer_idx = ((latest_info.number + 1) % num_clients as u64) as usize;
157
158            env.last_producer_idx = Some(next_producer_idx);
159            debug!(
160                "Selected node {} as the next block producer for block {}",
161                next_producer_idx,
162                latest_info.number + 1
163            );
164
165            Ok(())
166        })
167    }
168}
169
170/// Store payload attributes for the next block.
171#[derive(Debug, Default)]
172pub struct GeneratePayloadAttributes {}
173
174impl<Engine> Action<Engine> for GeneratePayloadAttributes
175where
176    Engine: EngineTypes + PayloadTypes,
177    Engine::PayloadAttributes: From<PayloadAttributes>,
178{
179    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
180        Box::pin(async move {
181            let latest_block = env
182                .current_block_info()
183                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
184            let block_number = latest_block.number;
185            let timestamp =
186                env.active_node_state()?.latest_header_time + env.block_timestamp_increment;
187            let payload_attributes = PayloadAttributes {
188                timestamp,
189                prev_randao: B256::random(),
190                suggested_fee_recipient: alloy_primitives::Address::random(),
191                withdrawals: Some(vec![]),
192                parent_beacon_block_root: Some(B256::ZERO),
193            };
194
195            env.active_node_state_mut()?
196                .payload_attributes
197                .insert(latest_block.number + 1, payload_attributes);
198            debug!("Stored payload attributes for block {}", block_number + 1);
199            Ok(())
200        })
201    }
202}
203
204/// Action that generates the next payload
205#[derive(Debug, Default)]
206pub struct GenerateNextPayload {}
207
208impl<Engine> Action<Engine> for GenerateNextPayload
209where
210    Engine: EngineTypes + PayloadTypes,
211    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
212{
213    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
214        Box::pin(async move {
215            let latest_block = env
216                .current_block_info()
217                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
218
219            let parent_hash = latest_block.hash;
220            debug!("Latest block hash: {parent_hash}");
221
222            let fork_choice_state = ForkchoiceState {
223                head_block_hash: parent_hash,
224                safe_block_hash: parent_hash,
225                finalized_block_hash: parent_hash,
226            };
227
228            let payload_attributes = env
229                .active_node_state()?
230                .payload_attributes
231                .get(&(latest_block.number + 1))
232                .cloned()
233                .ok_or_else(|| eyre::eyre!("No payload attributes found for next block"))?;
234
235            let producer_idx =
236                env.last_producer_idx.ok_or_else(|| eyre::eyre!("No block producer selected"))?;
237
238            let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
239                &env.node_clients[producer_idx].engine.http_client(),
240                fork_choice_state,
241                Some(payload_attributes.clone().into()),
242            )
243            .await?;
244
245            debug!("FCU result: {:?}", fcu_result);
246
247            // validate the FCU status before proceeding
248            // Note: In the context of GenerateNextPayload, Syncing usually means the engine
249            // doesn't have the requested head block, which should be an error
250            expect_fcu_not_syncing_or_accepted(&fcu_result, "GenerateNextPayload")?;
251
252            let payload_id = if let Some(payload_id) = fcu_result.payload_id {
253                debug!("Received new payload ID: {:?}", payload_id);
254                payload_id
255            } else {
256                debug!("No payload ID returned, generating fresh payload attributes for forking");
257
258                let fresh_payload_attributes = PayloadAttributes {
259                    timestamp: env.active_node_state()?.latest_header_time +
260                        env.block_timestamp_increment,
261                    prev_randao: B256::random(),
262                    suggested_fee_recipient: alloy_primitives::Address::random(),
263                    withdrawals: Some(vec![]),
264                    parent_beacon_block_root: Some(B256::ZERO),
265                };
266
267                let fresh_fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
268                    &env.node_clients[producer_idx].engine.http_client(),
269                    fork_choice_state,
270                    Some(fresh_payload_attributes.clone().into()),
271                )
272                .await?;
273
274                debug!("Fresh FCU result: {:?}", fresh_fcu_result);
275
276                // validate the fresh FCU status
277                expect_fcu_not_syncing_or_accepted(
278                    &fresh_fcu_result,
279                    "GenerateNextPayload (fresh)",
280                )?;
281
282                if let Some(payload_id) = fresh_fcu_result.payload_id {
283                    payload_id
284                } else {
285                    debug!("Engine considers the fork base already canonical, skipping payload generation");
286                    return Ok(());
287                }
288            };
289
290            env.active_node_state_mut()?.next_payload_id = Some(payload_id);
291
292            sleep(Duration::from_secs(1)).await;
293
294            let built_payload_envelope = EngineApiClient::<Engine>::get_payload_v3(
295                &env.node_clients[producer_idx].engine.http_client(),
296                payload_id,
297            )
298            .await?;
299
300            // Store the payload attributes that were used to generate this payload
301            let built_payload = payload_attributes.clone();
302            env.active_node_state_mut()?
303                .payload_id_history
304                .insert(latest_block.number + 1, payload_id);
305            env.active_node_state_mut()?.latest_payload_built = Some(built_payload);
306            env.active_node_state_mut()?.latest_payload_envelope = Some(built_payload_envelope);
307
308            Ok(())
309        })
310    }
311}
312
313/// Action that broadcasts the latest fork choice state to all clients
314#[derive(Debug, Default)]
315pub struct BroadcastLatestForkchoice {}
316
317impl<Engine> Action<Engine> for BroadcastLatestForkchoice
318where
319    Engine: EngineTypes + PayloadTypes,
320    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
321    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
322{
323    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
324        Box::pin(async move {
325            if env.node_clients.is_empty() {
326                return Err(eyre::eyre!("No node clients available"));
327            }
328
329            // use the hash of the newly executed payload if available
330            let head_hash = if let Some(payload_envelope) =
331                &env.active_node_state()?.latest_payload_envelope
332            {
333                let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
334                    payload_envelope.clone().into();
335                let new_block_hash = execution_payload_envelope
336                    .execution_payload
337                    .payload_inner
338                    .payload_inner
339                    .block_hash;
340                debug!("Using newly executed block hash as head: {new_block_hash}");
341                new_block_hash
342            } else {
343                // fallback to RPC query
344                let rpc_client = &env.node_clients[0].rpc;
345                let current_head_block = EthApiClient::<
346                    TransactionRequest,
347                    Transaction,
348                    Block,
349                    Receipt,
350                    Header,
351                >::block_by_number(
352                    rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
353                )
354                .await?
355                .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
356                debug!("Using RPC latest block hash as head: {}", current_head_block.header.hash);
357                current_head_block.header.hash
358            };
359
360            let fork_choice_state = ForkchoiceState {
361                head_block_hash: head_hash,
362                safe_block_hash: head_hash,
363                finalized_block_hash: head_hash,
364            };
365            debug!(
366                "Broadcasting forkchoice update to {} clients. Head: {:?}",
367                env.node_clients.len(),
368                fork_choice_state.head_block_hash
369            );
370
371            for (idx, client) in env.node_clients.iter().enumerate() {
372                match EngineApiClient::<Engine>::fork_choice_updated_v3(
373                    &client.engine.http_client(),
374                    fork_choice_state,
375                    None,
376                )
377                .await
378                {
379                    Ok(resp) => {
380                        debug!(
381                            "Client {}: Forkchoice update status: {:?}",
382                            idx, resp.payload_status.status
383                        );
384                        // validate that the forkchoice update was accepted
385                        validate_fcu_response(&resp, &format!("Client {idx}"))?;
386                    }
387                    Err(err) => {
388                        return Err(eyre::eyre!(
389                            "Client {}: Failed to broadcast forkchoice: {:?}",
390                            idx,
391                            err
392                        ));
393                    }
394                }
395            }
396            debug!("Forkchoice update broadcasted successfully");
397            Ok(())
398        })
399    }
400}
401
402/// Action that syncs environment state with the node's canonical chain via RPC.
403///
404/// This queries the latest canonical block from the node and updates the environment
405/// to match. Typically used after forkchoice operations to ensure the environment
406/// is in sync with the node's view of the canonical chain.
407#[derive(Debug, Default)]
408pub struct UpdateBlockInfo {}
409
410impl<Engine> Action<Engine> for UpdateBlockInfo
411where
412    Engine: EngineTypes,
413{
414    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
415        Box::pin(async move {
416            // get the latest block from the first client to update environment state
417            let rpc_client = &env.node_clients[0].rpc;
418            let latest_block = EthApiClient::<
419                TransactionRequest,
420                Transaction,
421                Block,
422                Receipt,
423                Header,
424            >::block_by_number(
425                rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
426            )
427            .await?
428            .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
429
430            // update environment with the new block information
431            env.set_current_block_info(BlockInfo {
432                hash: latest_block.header.hash,
433                number: latest_block.header.number,
434                timestamp: latest_block.header.timestamp,
435            })?;
436
437            env.active_node_state_mut()?.latest_header_time = latest_block.header.timestamp;
438            env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
439                latest_block.header.hash;
440
441            debug!(
442                "Updated environment to block {} (hash: {})",
443                latest_block.header.number, latest_block.header.hash
444            );
445
446            Ok(())
447        })
448    }
449}
450
451/// Action that updates environment state using the locally produced payload.
452///
453/// This uses the execution payload stored in the environment rather than querying RPC,
454/// making it more efficient and reliable during block production. Preferred over
455/// `UpdateBlockInfo` when we have just produced a block and have the payload available.
456#[derive(Debug, Default)]
457pub struct UpdateBlockInfoToLatestPayload {}
458
459impl<Engine> Action<Engine> for UpdateBlockInfoToLatestPayload
460where
461    Engine: EngineTypes + PayloadTypes,
462    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
463{
464    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
465        Box::pin(async move {
466            let payload_envelope = env
467                .active_node_state()?
468                .latest_payload_envelope
469                .as_ref()
470                .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?;
471
472            let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
473                payload_envelope.clone().into();
474            let execution_payload = execution_payload_envelope.execution_payload;
475
476            let block_hash = execution_payload.payload_inner.payload_inner.block_hash;
477            let block_number = execution_payload.payload_inner.payload_inner.block_number;
478            let block_timestamp = execution_payload.payload_inner.payload_inner.timestamp;
479
480            // update environment with the new block information from the payload
481            env.set_current_block_info(BlockInfo {
482                hash: block_hash,
483                number: block_number,
484                timestamp: block_timestamp,
485            })?;
486
487            env.active_node_state_mut()?.latest_header_time = block_timestamp;
488            env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash = block_hash;
489
490            debug!(
491                "Updated environment to newly produced block {} (hash: {})",
492                block_number, block_hash
493            );
494
495            Ok(())
496        })
497    }
498}
499
500/// Action that checks whether the broadcasted new payload has been accepted
501#[derive(Debug, Default)]
502pub struct CheckPayloadAccepted {}
503
504impl<Engine> Action<Engine> for CheckPayloadAccepted
505where
506    Engine: EngineTypes,
507    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
508{
509    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
510        Box::pin(async move {
511            let mut accepted_check: bool = false;
512
513            let mut latest_block = env
514                .current_block_info()
515                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
516
517            let payload_id = *env
518                .active_node_state()?
519                .payload_id_history
520                .get(&(latest_block.number + 1))
521                .ok_or_else(|| eyre::eyre!("Cannot find payload_id"))?;
522
523            let node_clients = env.node_clients.clone();
524            for (idx, client) in node_clients.iter().enumerate() {
525                let rpc_client = &client.rpc;
526
527                // get the last header by number using latest_head_number
528                let rpc_latest_header = EthApiClient::<
529                    TransactionRequest,
530                    Transaction,
531                    Block,
532                    Receipt,
533                    Header,
534                >::header_by_number(
535                    rpc_client, alloy_eips::BlockNumberOrTag::Latest
536                )
537                .await?
538                .ok_or_else(|| eyre::eyre!("No latest header found from rpc"))?;
539
540                // perform several checks
541                let next_new_payload = env
542                    .active_node_state()?
543                    .latest_payload_built
544                    .as_ref()
545                    .ok_or_else(|| eyre::eyre!("No next built payload found"))?;
546
547                let built_payload = EngineApiClient::<Engine>::get_payload_v3(
548                    &client.engine.http_client(),
549                    payload_id,
550                )
551                .await?;
552
553                let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = built_payload.into();
554                let new_payload_block_hash = execution_payload_envelope
555                    .execution_payload
556                    .payload_inner
557                    .payload_inner
558                    .block_hash;
559
560                if rpc_latest_header.hash != new_payload_block_hash {
561                    debug!(
562                        "Client {}: The hash is not matched: {:?} {:?}",
563                        idx, rpc_latest_header.hash, new_payload_block_hash
564                    );
565                    continue;
566                }
567
568                if rpc_latest_header.inner.difficulty != alloy_primitives::U256::ZERO {
569                    debug!(
570                        "Client {}: difficulty != 0: {:?}",
571                        idx, rpc_latest_header.inner.difficulty
572                    );
573                    continue;
574                }
575
576                if rpc_latest_header.inner.mix_hash != next_new_payload.prev_randao {
577                    debug!(
578                        "Client {}: The mix_hash and prev_randao is not same: {:?} {:?}",
579                        idx, rpc_latest_header.inner.mix_hash, next_new_payload.prev_randao
580                    );
581                    continue;
582                }
583
584                let extra_len = rpc_latest_header.inner.extra_data.len();
585                if extra_len <= 32 {
586                    debug!("Client {}: extra_len is fewer than 32. extra_len: {}", idx, extra_len);
587                    continue;
588                }
589
590                // at least one client passes all the check, save the header in Env
591                if !accepted_check {
592                    accepted_check = true;
593                    // save the header in Env
594                    env.active_node_state_mut()?.latest_header_time = next_new_payload.timestamp;
595
596                    // add it to header history
597                    env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
598                        rpc_latest_header.hash;
599                    latest_block.hash = rpc_latest_header.hash;
600                    latest_block.number = rpc_latest_header.inner.number;
601                }
602            }
603
604            if accepted_check {
605                Ok(())
606            } else {
607                Err(eyre::eyre!("No clients passed payload acceptance checks"))
608            }
609        })
610    }
611}
612
613/// Action that broadcasts the next new payload
614#[derive(Debug, Default)]
615pub struct BroadcastNextNewPayload {
616    /// If true, only send to the active node. If false, broadcast to all nodes.
617    active_node_only: bool,
618}
619
620impl BroadcastNextNewPayload {
621    /// Create a new `BroadcastNextNewPayload` action that only sends to the active node
622    pub const fn with_active_node() -> Self {
623        Self { active_node_only: true }
624    }
625}
626
627impl<Engine> Action<Engine> for BroadcastNextNewPayload
628where
629    Engine: EngineTypes + PayloadTypes,
630    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
631    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
632{
633    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
634        Box::pin(async move {
635            // Get the next new payload to broadcast
636            let next_new_payload = env
637                .active_node_state()?
638                .latest_payload_built
639                .as_ref()
640                .ok_or_else(|| eyre::eyre!("No next built payload found"))?
641                .clone();
642            let parent_beacon_block_root = next_new_payload
643                .parent_beacon_block_root
644                .ok_or_else(|| eyre::eyre!("No parent beacon block root for next new payload"))?;
645
646            let payload_envelope = env
647                .active_node_state()?
648                .latest_payload_envelope
649                .as_ref()
650                .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?
651                .clone();
652
653            let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = payload_envelope.into();
654            let execution_payload = execution_payload_envelope.execution_payload;
655
656            if self.active_node_only {
657                // Send only to the active node
658                let active_idx = env.active_node_idx;
659                let engine = env.node_clients[active_idx].engine.http_client();
660
661                let result = EngineApiClient::<Engine>::new_payload_v3(
662                    &engine,
663                    execution_payload.clone(),
664                    vec![],
665                    parent_beacon_block_root,
666                )
667                .await?;
668
669                debug!("Active node {}: new_payload status: {:?}", active_idx, result.status);
670
671                // Validate the response
672                match result.status {
673                    PayloadStatusEnum::Valid => {
674                        env.active_node_state_mut()?.latest_payload_executed =
675                            Some(next_new_payload);
676                        Ok(())
677                    }
678                    other => Err(eyre::eyre!(
679                        "Active node {}: Unexpected payload status: {:?}",
680                        active_idx,
681                        other
682                    )),
683                }
684            } else {
685                // Loop through all clients and broadcast the next new payload
686                let mut broadcast_results = Vec::new();
687                let mut first_valid_seen = false;
688
689                for (idx, client) in env.node_clients.iter().enumerate() {
690                    let engine = client.engine.http_client();
691
692                    // Broadcast the execution payload
693                    let result = EngineApiClient::<Engine>::new_payload_v3(
694                        &engine,
695                        execution_payload.clone(),
696                        vec![],
697                        parent_beacon_block_root,
698                    )
699                    .await?;
700
701                    broadcast_results.push((idx, result.status.clone()));
702                    debug!("Node {}: new_payload broadcast status: {:?}", idx, result.status);
703
704                    // Check if this node accepted the payload
705                    if result.status == PayloadStatusEnum::Valid && !first_valid_seen {
706                        first_valid_seen = true;
707                    } else if let PayloadStatusEnum::Invalid { validation_error } = result.status {
708                        debug!(
709                            "Node {}: Invalid payload status returned from broadcast: {:?}",
710                            idx, validation_error
711                        );
712                    }
713                }
714
715                // Update the executed payload state after broadcasting to all nodes
716                if first_valid_seen {
717                    env.active_node_state_mut()?.latest_payload_executed = Some(next_new_payload);
718                }
719
720                // Check if at least one node accepted the payload
721                let any_valid =
722                    broadcast_results.iter().any(|(_, status)| *status == PayloadStatusEnum::Valid);
723                if !any_valid {
724                    return Err(eyre::eyre!(
725                        "Failed to successfully broadcast payload to any client"
726                    ));
727                }
728
729                debug!("Broadcast complete. Results: {:?}", broadcast_results);
730
731                Ok(())
732            }
733        })
734    }
735}
736
737/// Action that produces a sequence of blocks using the available clients
738#[derive(Debug)]
739pub struct ProduceBlocks<Engine> {
740    /// Number of blocks to produce
741    pub num_blocks: u64,
742    /// Tracks engine type
743    _phantom: PhantomData<Engine>,
744}
745
746impl<Engine> ProduceBlocks<Engine> {
747    /// Create a new `ProduceBlocks` action
748    pub fn new(num_blocks: u64) -> Self {
749        Self { num_blocks, _phantom: Default::default() }
750    }
751}
752
753impl<Engine> Default for ProduceBlocks<Engine> {
754    fn default() -> Self {
755        Self::new(0)
756    }
757}
758
759impl<Engine> Action<Engine> for ProduceBlocks<Engine>
760where
761    Engine: EngineTypes + PayloadTypes,
762    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
763    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
764{
765    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
766        Box::pin(async move {
767            for _ in 0..self.num_blocks {
768                // create a fresh sequence for each block to avoid state pollution
769                // Note: This produces blocks but does NOT make them canonical
770                // Use MakeCanonical action explicitly if canonicalization is needed
771                let mut sequence = Sequence::new(vec![
772                    Box::new(PickNextBlockProducer::default()),
773                    Box::new(GeneratePayloadAttributes::default()),
774                    Box::new(GenerateNextPayload::default()),
775                    Box::new(BroadcastNextNewPayload::default()),
776                    Box::new(UpdateBlockInfoToLatestPayload::default()),
777                ]);
778                sequence.execute(env).await?;
779            }
780            Ok(())
781        })
782    }
783}
784
785/// Action to test forkchoice update to a tagged block with expected status
786#[derive(Debug)]
787pub struct TestFcuToTag {
788    /// Tag name of the target block
789    pub tag: String,
790    /// Expected payload status
791    pub expected_status: PayloadStatusEnum,
792}
793
794impl TestFcuToTag {
795    /// Create a new `TestFcuToTag` action
796    pub fn new(tag: impl Into<String>, expected_status: PayloadStatusEnum) -> Self {
797        Self { tag: tag.into(), expected_status }
798    }
799}
800
801impl<Engine> Action<Engine> for TestFcuToTag
802where
803    Engine: EngineTypes,
804{
805    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
806        Box::pin(async move {
807            // get the target block from the registry
808            let (target_block, _node_idx) = env
809                .block_registry
810                .get(&self.tag)
811                .copied()
812                .ok_or_else(|| eyre::eyre!("Block tag '{}' not found in registry", self.tag))?;
813
814            let engine_client = env.node_clients[0].engine.http_client();
815            let fcu_state = ForkchoiceState {
816                head_block_hash: target_block.hash,
817                safe_block_hash: target_block.hash,
818                finalized_block_hash: target_block.hash,
819            };
820
821            let fcu_response =
822                EngineApiClient::<Engine>::fork_choice_updated_v2(&engine_client, fcu_state, None)
823                    .await?;
824
825            // validate the response matches expected status
826            match (&fcu_response.payload_status.status, &self.expected_status) {
827                (PayloadStatusEnum::Valid, PayloadStatusEnum::Valid) => {
828                    debug!("FCU to '{}' returned VALID as expected", self.tag);
829                }
830                (PayloadStatusEnum::Invalid { .. }, PayloadStatusEnum::Invalid { .. }) => {
831                    debug!("FCU to '{}' returned INVALID as expected", self.tag);
832                }
833                (PayloadStatusEnum::Syncing, PayloadStatusEnum::Syncing) => {
834                    debug!("FCU to '{}' returned SYNCING as expected", self.tag);
835                }
836                (PayloadStatusEnum::Accepted, PayloadStatusEnum::Accepted) => {
837                    debug!("FCU to '{}' returned ACCEPTED as expected", self.tag);
838                }
839                (actual, expected) => {
840                    return Err(eyre::eyre!(
841                        "FCU to '{}': expected status {:?}, but got {:?}",
842                        self.tag,
843                        expected,
844                        actual
845                    ));
846                }
847            }
848
849            Ok(())
850        })
851    }
852}
853
854/// Action to expect a specific FCU status when targeting a tagged block
855#[derive(Debug)]
856pub struct ExpectFcuStatus {
857    /// Tag name of the target block
858    pub target_tag: String,
859    /// Expected payload status
860    pub expected_status: PayloadStatusEnum,
861}
862
863impl ExpectFcuStatus {
864    /// Create a new `ExpectFcuStatus` action expecting VALID status
865    pub fn valid(target_tag: impl Into<String>) -> Self {
866        Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Valid }
867    }
868
869    /// Create a new `ExpectFcuStatus` action expecting INVALID status
870    pub fn invalid(target_tag: impl Into<String>) -> Self {
871        Self {
872            target_tag: target_tag.into(),
873            expected_status: PayloadStatusEnum::Invalid {
874                validation_error: "corrupted block".to_string(),
875            },
876        }
877    }
878
879    /// Create a new `ExpectFcuStatus` action expecting SYNCING status
880    pub fn syncing(target_tag: impl Into<String>) -> Self {
881        Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Syncing }
882    }
883
884    /// Create a new `ExpectFcuStatus` action expecting ACCEPTED status
885    pub fn accepted(target_tag: impl Into<String>) -> Self {
886        Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Accepted }
887    }
888}
889
890impl<Engine> Action<Engine> for ExpectFcuStatus
891where
892    Engine: EngineTypes,
893{
894    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
895        Box::pin(async move {
896            let mut test_fcu = TestFcuToTag::new(&self.target_tag, self.expected_status.clone());
897            test_fcu.execute(env).await
898        })
899    }
900}
901
902/// Action to validate that a tagged block remains canonical by performing FCU to it
903#[derive(Debug)]
904pub struct ValidateCanonicalTag {
905    /// Tag name of the block to validate as canonical
906    pub tag: String,
907}
908
909impl ValidateCanonicalTag {
910    /// Create a new `ValidateCanonicalTag` action
911    pub fn new(tag: impl Into<String>) -> Self {
912        Self { tag: tag.into() }
913    }
914}
915
916impl<Engine> Action<Engine> for ValidateCanonicalTag
917where
918    Engine: EngineTypes,
919{
920    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
921        Box::pin(async move {
922            let mut expect_valid = ExpectFcuStatus::valid(&self.tag);
923            expect_valid.execute(env).await?;
924
925            debug!("Successfully validated that '{}' remains canonical", self.tag);
926            Ok(())
927        })
928    }
929}
930
931/// Action that produces blocks locally without broadcasting to other nodes
932/// This sends the payload only to the active node to ensure it's available locally
933#[derive(Debug)]
934pub struct ProduceBlocksLocally<Engine> {
935    /// Number of blocks to produce
936    pub num_blocks: u64,
937    /// Tracks engine type
938    _phantom: PhantomData<Engine>,
939}
940
941impl<Engine> ProduceBlocksLocally<Engine> {
942    /// Create a new `ProduceBlocksLocally` action
943    pub fn new(num_blocks: u64) -> Self {
944        Self { num_blocks, _phantom: Default::default() }
945    }
946}
947
948impl<Engine> Default for ProduceBlocksLocally<Engine> {
949    fn default() -> Self {
950        Self::new(0)
951    }
952}
953
954impl<Engine> Action<Engine> for ProduceBlocksLocally<Engine>
955where
956    Engine: EngineTypes + PayloadTypes,
957    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
958    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
959{
960    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
961        Box::pin(async move {
962            // Remember the active node to ensure all blocks are produced on the same node
963            let producer_idx = env.active_node_idx;
964
965            for _ in 0..self.num_blocks {
966                // Ensure we always use the same producer
967                env.last_producer_idx = Some(producer_idx);
968
969                // create a sequence that produces blocks and sends only to active node
970                let mut sequence = Sequence::new(vec![
971                    // Skip PickNextBlockProducer to maintain the same producer
972                    Box::new(GeneratePayloadAttributes::default()),
973                    Box::new(GenerateNextPayload::default()),
974                    // Send payload only to the active node to make it available
975                    Box::new(BroadcastNextNewPayload::with_active_node()),
976                    Box::new(UpdateBlockInfoToLatestPayload::default()),
977                ]);
978                sequence.execute(env).await?;
979            }
980            Ok(())
981        })
982    }
983}
984
985/// Action that produces a sequence of blocks where some blocks are intentionally invalid
986#[derive(Debug)]
987pub struct ProduceInvalidBlocks<Engine> {
988    /// Number of blocks to produce
989    pub num_blocks: u64,
990    /// Set of indices (0-based) where blocks should be made invalid
991    pub invalid_indices: HashSet<u64>,
992    /// Tracks engine type
993    _phantom: PhantomData<Engine>,
994}
995
996impl<Engine> ProduceInvalidBlocks<Engine> {
997    /// Create a new `ProduceInvalidBlocks` action
998    pub fn new(num_blocks: u64, invalid_indices: HashSet<u64>) -> Self {
999        Self { num_blocks, invalid_indices, _phantom: Default::default() }
1000    }
1001
1002    /// Create a new `ProduceInvalidBlocks` action with a single invalid block at the specified
1003    /// index
1004    pub fn with_invalid_at(num_blocks: u64, invalid_index: u64) -> Self {
1005        let mut invalid_indices = HashSet::new();
1006        invalid_indices.insert(invalid_index);
1007        Self::new(num_blocks, invalid_indices)
1008    }
1009}
1010
1011impl<Engine> Action<Engine> for ProduceInvalidBlocks<Engine>
1012where
1013    Engine: EngineTypes + PayloadTypes,
1014    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
1015    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
1016{
1017    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
1018        Box::pin(async move {
1019            for block_index in 0..self.num_blocks {
1020                let is_invalid = self.invalid_indices.contains(&block_index);
1021
1022                if is_invalid {
1023                    debug!("Producing invalid block at index {}", block_index);
1024
1025                    // produce a valid block first, then corrupt it
1026                    let mut sequence = Sequence::new(vec![
1027                        Box::new(PickNextBlockProducer::default()),
1028                        Box::new(GeneratePayloadAttributes::default()),
1029                        Box::new(GenerateNextPayload::default()),
1030                    ]);
1031                    sequence.execute(env).await?;
1032
1033                    // get the latest payload and corrupt it
1034                    let latest_envelope =
1035                        env.active_node_state()?.latest_payload_envelope.as_ref().ok_or_else(
1036                            || eyre::eyre!("No payload envelope available to corrupt"),
1037                        )?;
1038
1039                    let envelope_v3: ExecutionPayloadEnvelopeV3 = latest_envelope.clone().into();
1040                    let mut corrupted_payload = envelope_v3.execution_payload;
1041
1042                    // corrupt the state root to make the block invalid
1043                    corrupted_payload.payload_inner.payload_inner.state_root = B256::random();
1044
1045                    debug!(
1046                        "Corrupted state root for block {} to: {}",
1047                        block_index, corrupted_payload.payload_inner.payload_inner.state_root
1048                    );
1049
1050                    // send the corrupted payload via newPayload
1051                    let engine_client = env.node_clients[0].engine.http_client();
1052                    // for simplicity, we'll use empty versioned hashes for invalid block testing
1053                    let versioned_hashes = Vec::new();
1054                    // use a random parent beacon block root since this is for invalid block testing
1055                    let parent_beacon_block_root = B256::random();
1056
1057                    let new_payload_response = EngineApiClient::<Engine>::new_payload_v3(
1058                        &engine_client,
1059                        corrupted_payload.clone(),
1060                        versioned_hashes,
1061                        parent_beacon_block_root,
1062                    )
1063                    .await?;
1064
1065                    // expect the payload to be rejected as invalid
1066                    match new_payload_response.status {
1067                        PayloadStatusEnum::Invalid { validation_error } => {
1068                            debug!(
1069                                "Block {} correctly rejected as invalid: {:?}",
1070                                block_index, validation_error
1071                            );
1072                        }
1073                        other_status => {
1074                            return Err(eyre::eyre!(
1075                                "Expected block {} to be rejected as INVALID, but got: {:?}",
1076                                block_index,
1077                                other_status
1078                            ));
1079                        }
1080                    }
1081
1082                    // update block info with the corrupted block (for potential future reference)
1083                    env.set_current_block_info(BlockInfo {
1084                        hash: corrupted_payload.payload_inner.payload_inner.block_hash,
1085                        number: corrupted_payload.payload_inner.payload_inner.block_number,
1086                        timestamp: corrupted_payload.timestamp(),
1087                    })?;
1088                } else {
1089                    debug!("Producing valid block at index {}", block_index);
1090
1091                    // produce a valid block normally
1092                    let mut sequence = Sequence::new(vec![
1093                        Box::new(PickNextBlockProducer::default()),
1094                        Box::new(GeneratePayloadAttributes::default()),
1095                        Box::new(GenerateNextPayload::default()),
1096                        Box::new(BroadcastNextNewPayload::default()),
1097                        Box::new(UpdateBlockInfoToLatestPayload::default()),
1098                    ]);
1099                    sequence.execute(env).await?;
1100                }
1101            }
1102            Ok(())
1103        })
1104    }
1105}