reth_e2e_test_utils/testsuite/actions/
produce_blocks.rs

1//! Block production actions for the e2e testing framework.
2
3use crate::testsuite::{
4    actions::{expect_fcu_not_syncing_or_accepted, validate_fcu_response, Action, Sequence},
5    BlockInfo, Environment,
6};
7use alloy_primitives::{Bytes, B256};
8use alloy_rpc_types_engine::{
9    payload::ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum,
10};
11use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction, TransactionRequest};
12use eyre::Result;
13use futures_util::future::BoxFuture;
14use reth_ethereum_primitives::TransactionSigned;
15use reth_node_api::{EngineTypes, PayloadTypes};
16use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
17use std::{collections::HashSet, marker::PhantomData, time::Duration};
18use tokio::time::sleep;
19use tracing::debug;
20
21/// Mine a single block with the given transactions and verify the block was created
22/// successfully.
23#[derive(Debug)]
24pub struct AssertMineBlock<Engine>
25where
26    Engine: PayloadTypes,
27{
28    /// The node index to mine
29    pub node_idx: usize,
30    /// Transactions to include in the block
31    pub transactions: Vec<Bytes>,
32    /// Expected block hash (optional)
33    pub expected_hash: Option<B256>,
34    /// Block's payload attributes
35    // TODO: refactor once we have actions to generate payload attributes.
36    pub payload_attributes: Engine::PayloadAttributes,
37    /// Tracks engine type
38    _phantom: PhantomData<Engine>,
39}
40
41impl<Engine> AssertMineBlock<Engine>
42where
43    Engine: PayloadTypes,
44{
45    /// Create a new `AssertMineBlock` action
46    pub fn new(
47        node_idx: usize,
48        transactions: Vec<Bytes>,
49        expected_hash: Option<B256>,
50        payload_attributes: Engine::PayloadAttributes,
51    ) -> Self {
52        Self {
53            node_idx,
54            transactions,
55            expected_hash,
56            payload_attributes,
57            _phantom: Default::default(),
58        }
59    }
60}
61
62impl<Engine> Action<Engine> for AssertMineBlock<Engine>
63where
64    Engine: EngineTypes,
65{
66    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
67        Box::pin(async move {
68            if self.node_idx >= env.node_clients.len() {
69                return Err(eyre::eyre!("Node index out of bounds: {}", self.node_idx));
70            }
71
72            let node_client = &env.node_clients[self.node_idx];
73            let rpc_client = &node_client.rpc;
74            let engine_client = node_client.engine.http_client();
75
76            // get the latest block to use as parent
77            let latest_block = EthApiClient::<
78                TransactionRequest,
79                Transaction,
80                Block,
81                Receipt,
82                Header,
83                TransactionSigned,
84            >::block_by_number(
85                rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
86            )
87            .await?;
88
89            let latest_block = latest_block.ok_or_else(|| eyre::eyre!("Latest block not found"))?;
90            let parent_hash = latest_block.header.hash;
91
92            debug!("Latest block hash: {parent_hash}");
93
94            // create a simple forkchoice state with the latest block as head
95            let fork_choice_state = ForkchoiceState {
96                head_block_hash: parent_hash,
97                safe_block_hash: parent_hash,
98                finalized_block_hash: parent_hash,
99            };
100
101            // Try v2 first for backwards compatibility, fall back to v3 on error.
102            match EngineApiClient::<Engine>::fork_choice_updated_v2(
103                &engine_client,
104                fork_choice_state,
105                Some(self.payload_attributes.clone()),
106            )
107            .await
108            {
109                Ok(fcu_result) => {
110                    debug!(?fcu_result, "FCU v2 result");
111                    match fcu_result.payload_status.status {
112                        PayloadStatusEnum::Valid => {
113                            if let Some(payload_id) = fcu_result.payload_id {
114                                debug!(id=%payload_id, "Got payload");
115                                let _engine_payload = EngineApiClient::<Engine>::get_payload_v2(
116                                    &engine_client,
117                                    payload_id,
118                                )
119                                .await?;
120                                Ok(())
121                            } else {
122                                Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
123                            }
124                        }
125                        _ => Err(eyre::eyre!(
126                            "Payload status not valid: {:?}",
127                            fcu_result.payload_status
128                        ))?,
129                    }
130                }
131                Err(_) => {
132                    // If v2 fails due to unsupported fork/missing fields, try v3
133                    let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
134                        &engine_client,
135                        fork_choice_state,
136                        Some(self.payload_attributes.clone()),
137                    )
138                    .await?;
139
140                    debug!(?fcu_result, "FCU v3 result");
141                    match fcu_result.payload_status.status {
142                        PayloadStatusEnum::Valid => {
143                            if let Some(payload_id) = fcu_result.payload_id {
144                                debug!(id=%payload_id, "Got payload");
145                                let _engine_payload = EngineApiClient::<Engine>::get_payload_v3(
146                                    &engine_client,
147                                    payload_id,
148                                )
149                                .await?;
150                                Ok(())
151                            } else {
152                                Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
153                            }
154                        }
155                        _ => Err(eyre::eyre!(
156                            "Payload status not valid: {:?}",
157                            fcu_result.payload_status
158                        )),
159                    }
160                }
161            }
162        })
163    }
164}
165
166/// Pick the next block producer based on the latest block information.
167#[derive(Debug, Default)]
168pub struct PickNextBlockProducer {}
169
170impl PickNextBlockProducer {
171    /// Create a new `PickNextBlockProducer` action
172    pub const fn new() -> Self {
173        Self {}
174    }
175}
176
177impl<Engine> Action<Engine> for PickNextBlockProducer
178where
179    Engine: EngineTypes,
180{
181    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
182        Box::pin(async move {
183            let num_clients = env.node_clients.len();
184            if num_clients == 0 {
185                return Err(eyre::eyre!("No node clients available"));
186            }
187
188            let latest_info = env
189                .current_block_info()
190                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
191
192            // simple round-robin selection based on next block number
193            let next_producer_idx = ((latest_info.number + 1) % num_clients as u64) as usize;
194
195            env.last_producer_idx = Some(next_producer_idx);
196            debug!(
197                "Selected node {} as the next block producer for block {}",
198                next_producer_idx,
199                latest_info.number + 1
200            );
201
202            Ok(())
203        })
204    }
205}
206
207/// Store payload attributes for the next block.
208#[derive(Debug, Default)]
209pub struct GeneratePayloadAttributes {}
210
211impl<Engine> Action<Engine> for GeneratePayloadAttributes
212where
213    Engine: EngineTypes + PayloadTypes,
214    Engine::PayloadAttributes: From<PayloadAttributes>,
215{
216    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
217        Box::pin(async move {
218            let latest_block = env
219                .current_block_info()
220                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
221            let block_number = latest_block.number;
222            let timestamp =
223                env.active_node_state()?.latest_header_time + env.block_timestamp_increment;
224            let payload_attributes = PayloadAttributes {
225                timestamp,
226                prev_randao: B256::random(),
227                suggested_fee_recipient: alloy_primitives::Address::random(),
228                withdrawals: Some(vec![]),
229                parent_beacon_block_root: Some(B256::ZERO),
230            };
231
232            env.active_node_state_mut()?
233                .payload_attributes
234                .insert(latest_block.number + 1, payload_attributes);
235            debug!("Stored payload attributes for block {}", block_number + 1);
236            Ok(())
237        })
238    }
239}
240
241/// Action that generates the next payload
242#[derive(Debug, Default)]
243pub struct GenerateNextPayload {}
244
245impl<Engine> Action<Engine> for GenerateNextPayload
246where
247    Engine: EngineTypes + PayloadTypes,
248    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
249{
250    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
251        Box::pin(async move {
252            let latest_block = env
253                .current_block_info()
254                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
255
256            let parent_hash = latest_block.hash;
257            debug!("Latest block hash: {parent_hash}");
258
259            let fork_choice_state = ForkchoiceState {
260                head_block_hash: parent_hash,
261                safe_block_hash: parent_hash,
262                finalized_block_hash: parent_hash,
263            };
264
265            let payload_attributes = env
266                .active_node_state()?
267                .payload_attributes
268                .get(&(latest_block.number + 1))
269                .cloned()
270                .ok_or_else(|| eyre::eyre!("No payload attributes found for next block"))?;
271
272            let producer_idx =
273                env.last_producer_idx.ok_or_else(|| eyre::eyre!("No block producer selected"))?;
274
275            let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
276                &env.node_clients[producer_idx].engine.http_client(),
277                fork_choice_state,
278                Some(payload_attributes.clone().into()),
279            )
280            .await?;
281
282            debug!("FCU result: {:?}", fcu_result);
283
284            // validate the FCU status before proceeding
285            // Note: In the context of GenerateNextPayload, Syncing usually means the engine
286            // doesn't have the requested head block, which should be an error
287            expect_fcu_not_syncing_or_accepted(&fcu_result, "GenerateNextPayload")?;
288
289            let payload_id = if let Some(payload_id) = fcu_result.payload_id {
290                debug!("Received new payload ID: {:?}", payload_id);
291                payload_id
292            } else {
293                debug!("No payload ID returned, generating fresh payload attributes for forking");
294
295                let fresh_payload_attributes = PayloadAttributes {
296                    timestamp: env.active_node_state()?.latest_header_time +
297                        env.block_timestamp_increment,
298                    prev_randao: B256::random(),
299                    suggested_fee_recipient: alloy_primitives::Address::random(),
300                    withdrawals: Some(vec![]),
301                    parent_beacon_block_root: Some(B256::ZERO),
302                };
303
304                let fresh_fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
305                    &env.node_clients[producer_idx].engine.http_client(),
306                    fork_choice_state,
307                    Some(fresh_payload_attributes.clone().into()),
308                )
309                .await?;
310
311                debug!("Fresh FCU result: {:?}", fresh_fcu_result);
312
313                // validate the fresh FCU status
314                expect_fcu_not_syncing_or_accepted(
315                    &fresh_fcu_result,
316                    "GenerateNextPayload (fresh)",
317                )?;
318
319                if let Some(payload_id) = fresh_fcu_result.payload_id {
320                    payload_id
321                } else {
322                    debug!("Engine considers the fork base already canonical, skipping payload generation");
323                    return Ok(());
324                }
325            };
326
327            env.active_node_state_mut()?.next_payload_id = Some(payload_id);
328
329            sleep(Duration::from_secs(1)).await;
330
331            let built_payload_envelope = EngineApiClient::<Engine>::get_payload_v3(
332                &env.node_clients[producer_idx].engine.http_client(),
333                payload_id,
334            )
335            .await?;
336
337            // Store the payload attributes that were used to generate this payload
338            let built_payload = payload_attributes.clone();
339            env.active_node_state_mut()?
340                .payload_id_history
341                .insert(latest_block.number + 1, payload_id);
342            env.active_node_state_mut()?.latest_payload_built = Some(built_payload);
343            env.active_node_state_mut()?.latest_payload_envelope = Some(built_payload_envelope);
344
345            Ok(())
346        })
347    }
348}
349
350/// Action that broadcasts the latest fork choice state to all clients
351#[derive(Debug, Default)]
352pub struct BroadcastLatestForkchoice {}
353
354impl<Engine> Action<Engine> for BroadcastLatestForkchoice
355where
356    Engine: EngineTypes + PayloadTypes,
357    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
358    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
359{
360    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
361        Box::pin(async move {
362            if env.node_clients.is_empty() {
363                return Err(eyre::eyre!("No node clients available"));
364            }
365
366            // use the hash of the newly executed payload if available
367            let head_hash = if let Some(payload_envelope) =
368                &env.active_node_state()?.latest_payload_envelope
369            {
370                let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
371                    payload_envelope.clone().into();
372                let new_block_hash = execution_payload_envelope
373                    .execution_payload
374                    .payload_inner
375                    .payload_inner
376                    .block_hash;
377                debug!("Using newly executed block hash as head: {new_block_hash}");
378                new_block_hash
379            } else {
380                // fallback to RPC query
381                let rpc_client = &env.node_clients[0].rpc;
382                let current_head_block = EthApiClient::<
383                    TransactionRequest,
384                    Transaction,
385                    Block,
386                    Receipt,
387                    Header,
388                    TransactionSigned,
389                >::block_by_number(
390                    rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
391                )
392                .await?
393                .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
394                debug!("Using RPC latest block hash as head: {}", current_head_block.header.hash);
395                current_head_block.header.hash
396            };
397
398            let fork_choice_state = ForkchoiceState {
399                head_block_hash: head_hash,
400                safe_block_hash: head_hash,
401                finalized_block_hash: head_hash,
402            };
403            debug!(
404                "Broadcasting forkchoice update to {} clients. Head: {:?}",
405                env.node_clients.len(),
406                fork_choice_state.head_block_hash
407            );
408
409            for (idx, client) in env.node_clients.iter().enumerate() {
410                match EngineApiClient::<Engine>::fork_choice_updated_v3(
411                    &client.engine.http_client(),
412                    fork_choice_state,
413                    None,
414                )
415                .await
416                {
417                    Ok(resp) => {
418                        debug!(
419                            "Client {}: Forkchoice update status: {:?}",
420                            idx, resp.payload_status.status
421                        );
422                        // validate that the forkchoice update was accepted
423                        validate_fcu_response(&resp, &format!("Client {idx}"))?;
424                    }
425                    Err(err) => {
426                        return Err(eyre::eyre!(
427                            "Client {}: Failed to broadcast forkchoice: {:?}",
428                            idx,
429                            err
430                        ));
431                    }
432                }
433            }
434            debug!("Forkchoice update broadcasted successfully");
435            Ok(())
436        })
437    }
438}
439
440/// Action that syncs environment state with the node's canonical chain via RPC.
441///
442/// This queries the latest canonical block from the node and updates the environment
443/// to match. Typically used after forkchoice operations to ensure the environment
444/// is in sync with the node's view of the canonical chain.
445#[derive(Debug, Default)]
446pub struct UpdateBlockInfo {}
447
448impl<Engine> Action<Engine> for UpdateBlockInfo
449where
450    Engine: EngineTypes,
451{
452    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
453        Box::pin(async move {
454            // get the latest block from the first client to update environment state
455            let rpc_client = &env.node_clients[0].rpc;
456            let latest_block = EthApiClient::<
457                TransactionRequest,
458                Transaction,
459                Block,
460                Receipt,
461                Header,
462                TransactionSigned,
463            >::block_by_number(
464                rpc_client, alloy_eips::BlockNumberOrTag::Latest, false
465            )
466            .await?
467            .ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
468
469            // update environment with the new block information
470            env.set_current_block_info(BlockInfo {
471                hash: latest_block.header.hash,
472                number: latest_block.header.number,
473                timestamp: latest_block.header.timestamp,
474            })?;
475
476            env.active_node_state_mut()?.latest_header_time = latest_block.header.timestamp;
477            env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
478                latest_block.header.hash;
479
480            debug!(
481                "Updated environment to block {} (hash: {})",
482                latest_block.header.number, latest_block.header.hash
483            );
484
485            Ok(())
486        })
487    }
488}
489
490/// Action that updates environment state using the locally produced payload.
491///
492/// This uses the execution payload stored in the environment rather than querying RPC,
493/// making it more efficient and reliable during block production. Preferred over
494/// `UpdateBlockInfo` when we have just produced a block and have the payload available.
495#[derive(Debug, Default)]
496pub struct UpdateBlockInfoToLatestPayload {}
497
498impl<Engine> Action<Engine> for UpdateBlockInfoToLatestPayload
499where
500    Engine: EngineTypes + PayloadTypes,
501    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
502{
503    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
504        Box::pin(async move {
505            let payload_envelope = env
506                .active_node_state()?
507                .latest_payload_envelope
508                .as_ref()
509                .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?;
510
511            let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
512                payload_envelope.clone().into();
513            let execution_payload = execution_payload_envelope.execution_payload;
514
515            let block_hash = execution_payload.payload_inner.payload_inner.block_hash;
516            let block_number = execution_payload.payload_inner.payload_inner.block_number;
517            let block_timestamp = execution_payload.payload_inner.payload_inner.timestamp;
518
519            // update environment with the new block information from the payload
520            env.set_current_block_info(BlockInfo {
521                hash: block_hash,
522                number: block_number,
523                timestamp: block_timestamp,
524            })?;
525
526            env.active_node_state_mut()?.latest_header_time = block_timestamp;
527            env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash = block_hash;
528
529            debug!(
530                "Updated environment to newly produced block {} (hash: {})",
531                block_number, block_hash
532            );
533
534            Ok(())
535        })
536    }
537}
538
539/// Action that checks whether the broadcasted new payload has been accepted
540#[derive(Debug, Default)]
541pub struct CheckPayloadAccepted {}
542
543impl<Engine> Action<Engine> for CheckPayloadAccepted
544where
545    Engine: EngineTypes,
546    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
547{
548    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
549        Box::pin(async move {
550            let mut accepted_check: bool = false;
551
552            let latest_block = env
553                .current_block_info()
554                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
555
556            let payload_id = *env
557                .active_node_state()?
558                .payload_id_history
559                .get(&(latest_block.number + 1))
560                .ok_or_else(|| eyre::eyre!("Cannot find payload_id"))?;
561
562            let node_clients = env.node_clients.clone();
563            for (idx, client) in node_clients.iter().enumerate() {
564                let rpc_client = &client.rpc;
565
566                // get the last header by number using latest_head_number
567                let rpc_latest_header = EthApiClient::<
568                    TransactionRequest,
569                    Transaction,
570                    Block,
571                    Receipt,
572                    Header,
573                    TransactionSigned,
574                >::header_by_number(
575                    rpc_client, alloy_eips::BlockNumberOrTag::Latest
576                )
577                .await?
578                .ok_or_else(|| eyre::eyre!("No latest header found from rpc"))?;
579
580                // perform several checks
581                let next_new_payload = env
582                    .active_node_state()?
583                    .latest_payload_built
584                    .as_ref()
585                    .ok_or_else(|| eyre::eyre!("No next built payload found"))?;
586
587                let built_payload = EngineApiClient::<Engine>::get_payload_v3(
588                    &client.engine.http_client(),
589                    payload_id,
590                )
591                .await?;
592
593                let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = built_payload.into();
594                let new_payload_block_hash = execution_payload_envelope
595                    .execution_payload
596                    .payload_inner
597                    .payload_inner
598                    .block_hash;
599
600                if rpc_latest_header.hash != new_payload_block_hash {
601                    debug!(
602                        "Client {}: The hash is not matched: {:?} {:?}",
603                        idx, rpc_latest_header.hash, new_payload_block_hash
604                    );
605                    continue;
606                }
607
608                if rpc_latest_header.inner.difficulty != alloy_primitives::U256::ZERO {
609                    debug!(
610                        "Client {}: difficulty != 0: {:?}",
611                        idx, rpc_latest_header.inner.difficulty
612                    );
613                    continue;
614                }
615
616                if rpc_latest_header.inner.mix_hash != next_new_payload.prev_randao {
617                    debug!(
618                        "Client {}: The mix_hash and prev_randao is not same: {:?} {:?}",
619                        idx, rpc_latest_header.inner.mix_hash, next_new_payload.prev_randao
620                    );
621                    continue;
622                }
623
624                let extra_len = rpc_latest_header.inner.extra_data.len();
625                if extra_len <= 32 {
626                    debug!("Client {}: extra_len is fewer than 32. extra_len: {}", idx, extra_len);
627                    continue;
628                }
629
630                // at least one client passes all the check, save the header in Env
631                if !accepted_check {
632                    accepted_check = true;
633                    // save the current block info in Env
634                    env.set_current_block_info(BlockInfo {
635                        hash: rpc_latest_header.hash,
636                        number: rpc_latest_header.inner.number,
637                        timestamp: rpc_latest_header.inner.timestamp,
638                    })?;
639
640                    // align latest header time and forkchoice state with the accepted canonical
641                    // head
642                    env.active_node_state_mut()?.latest_header_time =
643                        rpc_latest_header.inner.timestamp;
644                    env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
645                        rpc_latest_header.hash;
646                }
647            }
648
649            if accepted_check {
650                Ok(())
651            } else {
652                Err(eyre::eyre!("No clients passed payload acceptance checks"))
653            }
654        })
655    }
656}
657
658/// Action that broadcasts the next new payload
659#[derive(Debug, Default)]
660pub struct BroadcastNextNewPayload {
661    /// If true, only send to the active node. If false, broadcast to all nodes.
662    active_node_only: bool,
663}
664
665impl BroadcastNextNewPayload {
666    /// Create a new `BroadcastNextNewPayload` action that only sends to the active node
667    pub const fn with_active_node() -> Self {
668        Self { active_node_only: true }
669    }
670}
671
672impl<Engine> Action<Engine> for BroadcastNextNewPayload
673where
674    Engine: EngineTypes + PayloadTypes,
675    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
676    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
677{
678    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
679        Box::pin(async move {
680            // Get the next new payload to broadcast
681            let next_new_payload = env
682                .active_node_state()?
683                .latest_payload_built
684                .as_ref()
685                .ok_or_else(|| eyre::eyre!("No next built payload found"))?
686                .clone();
687            let parent_beacon_block_root = next_new_payload
688                .parent_beacon_block_root
689                .ok_or_else(|| eyre::eyre!("No parent beacon block root for next new payload"))?;
690
691            let payload_envelope = env
692                .active_node_state()?
693                .latest_payload_envelope
694                .as_ref()
695                .ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?
696                .clone();
697
698            let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = payload_envelope.into();
699            let execution_payload = execution_payload_envelope.execution_payload;
700
701            if self.active_node_only {
702                // Send only to the active node
703                let active_idx = env.active_node_idx;
704                let engine = env.node_clients[active_idx].engine.http_client();
705
706                let result = EngineApiClient::<Engine>::new_payload_v3(
707                    &engine,
708                    execution_payload.clone(),
709                    vec![],
710                    parent_beacon_block_root,
711                )
712                .await?;
713
714                debug!("Active node {}: new_payload status: {:?}", active_idx, result.status);
715
716                // Validate the response
717                match result.status {
718                    PayloadStatusEnum::Valid => {
719                        env.active_node_state_mut()?.latest_payload_executed =
720                            Some(next_new_payload);
721                        Ok(())
722                    }
723                    other => Err(eyre::eyre!(
724                        "Active node {}: Unexpected payload status: {:?}",
725                        active_idx,
726                        other
727                    )),
728                }
729            } else {
730                // Loop through all clients and broadcast the next new payload
731                let mut broadcast_results = Vec::new();
732                let mut first_valid_seen = false;
733
734                for (idx, client) in env.node_clients.iter().enumerate() {
735                    let engine = client.engine.http_client();
736
737                    // Broadcast the execution payload
738                    let result = EngineApiClient::<Engine>::new_payload_v3(
739                        &engine,
740                        execution_payload.clone(),
741                        vec![],
742                        parent_beacon_block_root,
743                    )
744                    .await?;
745
746                    broadcast_results.push((idx, result.status.clone()));
747                    debug!("Node {}: new_payload broadcast status: {:?}", idx, result.status);
748
749                    // Check if this node accepted the payload
750                    if result.status == PayloadStatusEnum::Valid && !first_valid_seen {
751                        first_valid_seen = true;
752                    } else if let PayloadStatusEnum::Invalid { validation_error } = result.status {
753                        debug!(
754                            "Node {}: Invalid payload status returned from broadcast: {:?}",
755                            idx, validation_error
756                        );
757                    }
758                }
759
760                // Update the executed payload state after broadcasting to all nodes
761                if first_valid_seen {
762                    env.active_node_state_mut()?.latest_payload_executed = Some(next_new_payload);
763                }
764
765                // Check if at least one node accepted the payload
766                let any_valid =
767                    broadcast_results.iter().any(|(_, status)| *status == PayloadStatusEnum::Valid);
768                if !any_valid {
769                    return Err(eyre::eyre!(
770                        "Failed to successfully broadcast payload to any client"
771                    ));
772                }
773
774                debug!("Broadcast complete. Results: {:?}", broadcast_results);
775
776                Ok(())
777            }
778        })
779    }
780}
781
782/// Action that produces a sequence of blocks using the available clients
783#[derive(Debug)]
784pub struct ProduceBlocks<Engine> {
785    /// Number of blocks to produce
786    pub num_blocks: u64,
787    /// Tracks engine type
788    _phantom: PhantomData<Engine>,
789}
790
791impl<Engine> ProduceBlocks<Engine> {
792    /// Create a new `ProduceBlocks` action
793    pub fn new(num_blocks: u64) -> Self {
794        Self { num_blocks, _phantom: Default::default() }
795    }
796}
797
798impl<Engine> Default for ProduceBlocks<Engine> {
799    fn default() -> Self {
800        Self::new(0)
801    }
802}
803
804impl<Engine> Action<Engine> for ProduceBlocks<Engine>
805where
806    Engine: EngineTypes + PayloadTypes,
807    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
808    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
809{
810    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
811        Box::pin(async move {
812            for _ in 0..self.num_blocks {
813                // create a fresh sequence for each block to avoid state pollution
814                // Note: This produces blocks but does NOT make them canonical
815                // Use MakeCanonical action explicitly if canonicalization is needed
816                let mut sequence = Sequence::new(vec![
817                    Box::new(PickNextBlockProducer::default()),
818                    Box::new(GeneratePayloadAttributes::default()),
819                    Box::new(GenerateNextPayload::default()),
820                    Box::new(BroadcastNextNewPayload::default()),
821                    Box::new(UpdateBlockInfoToLatestPayload::default()),
822                ]);
823                sequence.execute(env).await?;
824            }
825            Ok(())
826        })
827    }
828}
829
830/// Action to test forkchoice update to a tagged block with expected status
831#[derive(Debug)]
832pub struct TestFcuToTag {
833    /// Tag name of the target block
834    pub tag: String,
835    /// Expected payload status
836    pub expected_status: PayloadStatusEnum,
837}
838
839impl TestFcuToTag {
840    /// Create a new `TestFcuToTag` action
841    pub fn new(tag: impl Into<String>, expected_status: PayloadStatusEnum) -> Self {
842        Self { tag: tag.into(), expected_status }
843    }
844}
845
846impl<Engine> Action<Engine> for TestFcuToTag
847where
848    Engine: EngineTypes,
849{
850    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
851        Box::pin(async move {
852            // get the target block from the registry
853            let (target_block, _node_idx) = env
854                .block_registry
855                .get(&self.tag)
856                .copied()
857                .ok_or_else(|| eyre::eyre!("Block tag '{}' not found in registry", self.tag))?;
858
859            let engine_client = env.node_clients[0].engine.http_client();
860            let fcu_state = ForkchoiceState {
861                head_block_hash: target_block.hash,
862                safe_block_hash: target_block.hash,
863                finalized_block_hash: target_block.hash,
864            };
865
866            let fcu_response =
867                EngineApiClient::<Engine>::fork_choice_updated_v2(&engine_client, fcu_state, None)
868                    .await?;
869
870            // validate the response matches expected status
871            match (&fcu_response.payload_status.status, &self.expected_status) {
872                (PayloadStatusEnum::Valid, PayloadStatusEnum::Valid) => {
873                    debug!("FCU to '{}' returned VALID as expected", self.tag);
874                }
875                (PayloadStatusEnum::Invalid { .. }, PayloadStatusEnum::Invalid { .. }) => {
876                    debug!("FCU to '{}' returned INVALID as expected", self.tag);
877                }
878                (PayloadStatusEnum::Syncing, PayloadStatusEnum::Syncing) => {
879                    debug!("FCU to '{}' returned SYNCING as expected", self.tag);
880                }
881                (PayloadStatusEnum::Accepted, PayloadStatusEnum::Accepted) => {
882                    debug!("FCU to '{}' returned ACCEPTED as expected", self.tag);
883                }
884                (actual, expected) => {
885                    return Err(eyre::eyre!(
886                        "FCU to '{}': expected status {:?}, but got {:?}",
887                        self.tag,
888                        expected,
889                        actual
890                    ));
891                }
892            }
893
894            Ok(())
895        })
896    }
897}
898
899/// Action to expect a specific FCU status when targeting a tagged block
900#[derive(Debug)]
901pub struct ExpectFcuStatus {
902    /// Tag name of the target block
903    pub target_tag: String,
904    /// Expected payload status
905    pub expected_status: PayloadStatusEnum,
906}
907
908impl ExpectFcuStatus {
909    /// Create a new `ExpectFcuStatus` action expecting VALID status
910    pub fn valid(target_tag: impl Into<String>) -> Self {
911        Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Valid }
912    }
913
914    /// Create a new `ExpectFcuStatus` action expecting INVALID status
915    pub fn invalid(target_tag: impl Into<String>) -> Self {
916        Self {
917            target_tag: target_tag.into(),
918            expected_status: PayloadStatusEnum::Invalid {
919                validation_error: "corrupted block".to_string(),
920            },
921        }
922    }
923
924    /// Create a new `ExpectFcuStatus` action expecting SYNCING status
925    pub fn syncing(target_tag: impl Into<String>) -> Self {
926        Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Syncing }
927    }
928
929    /// Create a new `ExpectFcuStatus` action expecting ACCEPTED status
930    pub fn accepted(target_tag: impl Into<String>) -> Self {
931        Self { target_tag: target_tag.into(), expected_status: PayloadStatusEnum::Accepted }
932    }
933}
934
935impl<Engine> Action<Engine> for ExpectFcuStatus
936where
937    Engine: EngineTypes,
938{
939    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
940        Box::pin(async move {
941            let mut test_fcu = TestFcuToTag::new(&self.target_tag, self.expected_status.clone());
942            test_fcu.execute(env).await
943        })
944    }
945}
946
947/// Action to validate that a tagged block remains canonical by performing FCU to it
948#[derive(Debug)]
949pub struct ValidateCanonicalTag {
950    /// Tag name of the block to validate as canonical
951    pub tag: String,
952}
953
954impl ValidateCanonicalTag {
955    /// Create a new `ValidateCanonicalTag` action
956    pub fn new(tag: impl Into<String>) -> Self {
957        Self { tag: tag.into() }
958    }
959}
960
961impl<Engine> Action<Engine> for ValidateCanonicalTag
962where
963    Engine: EngineTypes,
964{
965    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
966        Box::pin(async move {
967            let mut expect_valid = ExpectFcuStatus::valid(&self.tag);
968            expect_valid.execute(env).await?;
969
970            debug!("Successfully validated that '{}' remains canonical", self.tag);
971            Ok(())
972        })
973    }
974}
975
976/// Action that produces blocks locally without broadcasting to other nodes
977/// This sends the payload only to the active node to ensure it's available locally
978#[derive(Debug)]
979pub struct ProduceBlocksLocally<Engine> {
980    /// Number of blocks to produce
981    pub num_blocks: u64,
982    /// Tracks engine type
983    _phantom: PhantomData<Engine>,
984}
985
986impl<Engine> ProduceBlocksLocally<Engine> {
987    /// Create a new `ProduceBlocksLocally` action
988    pub fn new(num_blocks: u64) -> Self {
989        Self { num_blocks, _phantom: Default::default() }
990    }
991}
992
993impl<Engine> Default for ProduceBlocksLocally<Engine> {
994    fn default() -> Self {
995        Self::new(0)
996    }
997}
998
999impl<Engine> Action<Engine> for ProduceBlocksLocally<Engine>
1000where
1001    Engine: EngineTypes + PayloadTypes,
1002    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
1003    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
1004{
1005    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
1006        Box::pin(async move {
1007            // Remember the active node to ensure all blocks are produced on the same node
1008            let producer_idx = env.active_node_idx;
1009
1010            for _ in 0..self.num_blocks {
1011                // Ensure we always use the same producer
1012                env.last_producer_idx = Some(producer_idx);
1013
1014                // create a sequence that produces blocks and sends only to active node
1015                let mut sequence = Sequence::new(vec![
1016                    // Skip PickNextBlockProducer to maintain the same producer
1017                    Box::new(GeneratePayloadAttributes::default()),
1018                    Box::new(GenerateNextPayload::default()),
1019                    // Send payload only to the active node to make it available
1020                    Box::new(BroadcastNextNewPayload::with_active_node()),
1021                    Box::new(UpdateBlockInfoToLatestPayload::default()),
1022                ]);
1023                sequence.execute(env).await?;
1024            }
1025            Ok(())
1026        })
1027    }
1028}
1029
1030/// Action that produces a sequence of blocks where some blocks are intentionally invalid
1031#[derive(Debug)]
1032pub struct ProduceInvalidBlocks<Engine> {
1033    /// Number of blocks to produce
1034    pub num_blocks: u64,
1035    /// Set of indices (0-based) where blocks should be made invalid
1036    pub invalid_indices: HashSet<u64>,
1037    /// Tracks engine type
1038    _phantom: PhantomData<Engine>,
1039}
1040
1041impl<Engine> ProduceInvalidBlocks<Engine> {
1042    /// Create a new `ProduceInvalidBlocks` action
1043    pub fn new(num_blocks: u64, invalid_indices: HashSet<u64>) -> Self {
1044        Self { num_blocks, invalid_indices, _phantom: Default::default() }
1045    }
1046
1047    /// Create a new `ProduceInvalidBlocks` action with a single invalid block at the specified
1048    /// index
1049    pub fn with_invalid_at(num_blocks: u64, invalid_index: u64) -> Self {
1050        let mut invalid_indices = HashSet::new();
1051        invalid_indices.insert(invalid_index);
1052        Self::new(num_blocks, invalid_indices)
1053    }
1054}
1055
1056impl<Engine> Action<Engine> for ProduceInvalidBlocks<Engine>
1057where
1058    Engine: EngineTypes + PayloadTypes,
1059    Engine::PayloadAttributes: From<PayloadAttributes> + Clone,
1060    Engine::ExecutionPayloadEnvelopeV3: Into<ExecutionPayloadEnvelopeV3>,
1061{
1062    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
1063        Box::pin(async move {
1064            for block_index in 0..self.num_blocks {
1065                let is_invalid = self.invalid_indices.contains(&block_index);
1066
1067                if is_invalid {
1068                    debug!("Producing invalid block at index {}", block_index);
1069
1070                    // produce a valid block first, then corrupt it
1071                    let mut sequence = Sequence::new(vec![
1072                        Box::new(PickNextBlockProducer::default()),
1073                        Box::new(GeneratePayloadAttributes::default()),
1074                        Box::new(GenerateNextPayload::default()),
1075                    ]);
1076                    sequence.execute(env).await?;
1077
1078                    // get the latest payload and corrupt it
1079                    let latest_envelope =
1080                        env.active_node_state()?.latest_payload_envelope.as_ref().ok_or_else(
1081                            || eyre::eyre!("No payload envelope available to corrupt"),
1082                        )?;
1083
1084                    let envelope_v3: ExecutionPayloadEnvelopeV3 = latest_envelope.clone().into();
1085                    let mut corrupted_payload = envelope_v3.execution_payload;
1086
1087                    // corrupt the state root to make the block invalid
1088                    corrupted_payload.payload_inner.payload_inner.state_root = B256::random();
1089
1090                    debug!(
1091                        "Corrupted state root for block {} to: {}",
1092                        block_index, corrupted_payload.payload_inner.payload_inner.state_root
1093                    );
1094
1095                    // send the corrupted payload via newPayload
1096                    let engine_client = env.node_clients[0].engine.http_client();
1097                    // for simplicity, we'll use empty versioned hashes for invalid block testing
1098                    let versioned_hashes = Vec::new();
1099                    // use a random parent beacon block root since this is for invalid block testing
1100                    let parent_beacon_block_root = B256::random();
1101
1102                    let new_payload_response = EngineApiClient::<Engine>::new_payload_v3(
1103                        &engine_client,
1104                        corrupted_payload.clone(),
1105                        versioned_hashes,
1106                        parent_beacon_block_root,
1107                    )
1108                    .await?;
1109
1110                    // expect the payload to be rejected as invalid
1111                    match new_payload_response.status {
1112                        PayloadStatusEnum::Invalid { validation_error } => {
1113                            debug!(
1114                                "Block {} correctly rejected as invalid: {:?}",
1115                                block_index, validation_error
1116                            );
1117                        }
1118                        other_status => {
1119                            return Err(eyre::eyre!(
1120                                "Expected block {} to be rejected as INVALID, but got: {:?}",
1121                                block_index,
1122                                other_status
1123                            ));
1124                        }
1125                    }
1126
1127                    // update block info with the corrupted block (for potential future reference)
1128                    env.set_current_block_info(BlockInfo {
1129                        hash: corrupted_payload.payload_inner.payload_inner.block_hash,
1130                        number: corrupted_payload.payload_inner.payload_inner.block_number,
1131                        timestamp: corrupted_payload.timestamp(),
1132                    })?;
1133                } else {
1134                    debug!("Producing valid block at index {}", block_index);
1135
1136                    // produce a valid block normally
1137                    let mut sequence = Sequence::new(vec![
1138                        Box::new(PickNextBlockProducer::default()),
1139                        Box::new(GeneratePayloadAttributes::default()),
1140                        Box::new(GenerateNextPayload::default()),
1141                        Box::new(BroadcastNextNewPayload::default()),
1142                        Box::new(UpdateBlockInfoToLatestPayload::default()),
1143                    ]);
1144                    sequence.execute(env).await?;
1145                }
1146            }
1147            Ok(())
1148        })
1149    }
1150}