reth_e2e_test_utils/testsuite/actions/
engine_api.rs

1//! Engine API specific actions for testing.
2
3use crate::testsuite::{Action, Environment};
4use alloy_primitives::B256;
5use alloy_rpc_types_engine::{
6    ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3, PayloadStatusEnum,
7};
8use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction, TransactionRequest};
9use eyre::Result;
10use futures_util::future::BoxFuture;
11use reth_node_api::{EngineTypes, PayloadTypes};
12use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
13use std::marker::PhantomData;
14use tracing::debug;
15
16/// Action that sends a newPayload request to a specific node.
17#[derive(Debug)]
18pub struct SendNewPayload<Engine>
19where
20    Engine: EngineTypes,
21{
22    /// The node index to send to
23    pub node_idx: usize,
24    /// The block number to send
25    pub block_number: u64,
26    /// The source node to get the block from
27    pub source_node_idx: usize,
28    /// Expected payload status
29    pub expected_status: ExpectedPayloadStatus,
30    _phantom: PhantomData<Engine>,
31}
32
33/// Expected status for a payload
34#[derive(Debug, Clone)]
35pub enum ExpectedPayloadStatus {
36    /// Expect the payload to be valid
37    Valid,
38    /// Expect the payload to be invalid
39    Invalid,
40    /// Expect the payload to be syncing or accepted (buffered)
41    SyncingOrAccepted,
42}
43
44impl<Engine> SendNewPayload<Engine>
45where
46    Engine: EngineTypes,
47{
48    /// Create a new `SendNewPayload` action
49    pub fn new(
50        node_idx: usize,
51        block_number: u64,
52        source_node_idx: usize,
53        expected_status: ExpectedPayloadStatus,
54    ) -> Self {
55        Self {
56            node_idx,
57            block_number,
58            source_node_idx,
59            expected_status,
60            _phantom: Default::default(),
61        }
62    }
63}
64
65impl<Engine> Action<Engine> for SendNewPayload<Engine>
66where
67    Engine: EngineTypes + PayloadTypes,
68{
69    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
70        Box::pin(async move {
71            if self.node_idx >= env.node_clients.len() {
72                return Err(eyre::eyre!("Target node index out of bounds: {}", self.node_idx));
73            }
74            if self.source_node_idx >= env.node_clients.len() {
75                return Err(eyre::eyre!(
76                    "Source node index out of bounds: {}",
77                    self.source_node_idx
78                ));
79            }
80
81            // Get the block from the source node with retries
82            let source_rpc = &env.node_clients[self.source_node_idx].rpc;
83            let mut block = None;
84            let mut retries = 0;
85            const MAX_RETRIES: u32 = 5;
86
87            while retries < MAX_RETRIES {
88                match EthApiClient::<TransactionRequest, Transaction, Block, Receipt, Header>::block_by_number(
89                    source_rpc,
90                    alloy_eips::BlockNumberOrTag::Number(self.block_number),
91                    true, // include transactions
92                )
93                .await
94                {
95                    Ok(Some(b)) => {
96                        block = Some(b);
97                        break;
98                    }
99                    Ok(None) => {
100                        debug!(
101                            "Block {} not found on source node {} (attempt {}/{})",
102                            self.block_number,
103                            self.source_node_idx,
104                            retries + 1,
105                            MAX_RETRIES
106                        );
107                        retries += 1;
108                        if retries < MAX_RETRIES {
109                            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
110                        }
111                    }
112                    Err(e) => return Err(e.into()),
113                }
114            }
115
116            let block = block.ok_or_else(|| {
117                eyre::eyre!(
118                    "Block {} not found on source node {} after {} retries",
119                    self.block_number,
120                    self.source_node_idx,
121                    MAX_RETRIES
122                )
123            })?;
124
125            // Convert block to ExecutionPayloadV3
126            let payload = block_to_payload_v3(block.clone());
127
128            // Send the payload to the target node
129            let target_engine = env.node_clients[self.node_idx].engine.http_client();
130            let result = EngineApiClient::<Engine>::new_payload_v3(
131                &target_engine,
132                payload,
133                vec![],
134                B256::ZERO, // parent_beacon_block_root
135            )
136            .await?;
137
138            debug!(
139                "Node {}: new_payload for block {} response - status: {:?}, latest_valid_hash: {:?}",
140                self.node_idx, self.block_number, result.status, result.latest_valid_hash
141            );
142
143            // Validate the response based on expectations
144            match (&result.status, &self.expected_status) {
145                (PayloadStatusEnum::Valid, ExpectedPayloadStatus::Valid) => {
146                    debug!(
147                        "Node {}: Block {} marked as VALID as expected",
148                        self.node_idx, self.block_number
149                    );
150                    Ok(())
151                }
152                (
153                    PayloadStatusEnum::Invalid { validation_error },
154                    ExpectedPayloadStatus::Invalid,
155                ) => {
156                    debug!(
157                        "Node {}: Block {} marked as INVALID as expected: {:?}",
158                        self.node_idx, self.block_number, validation_error
159                    );
160                    Ok(())
161                }
162                (
163                    PayloadStatusEnum::Syncing | PayloadStatusEnum::Accepted,
164                    ExpectedPayloadStatus::SyncingOrAccepted,
165                ) => {
166                    debug!(
167                        "Node {}: Block {} marked as SYNCING/ACCEPTED as expected (buffered)",
168                        self.node_idx, self.block_number
169                    );
170                    Ok(())
171                }
172                (status, expected) => Err(eyre::eyre!(
173                    "Node {}: Unexpected payload status for block {}. Got {:?}, expected {:?}",
174                    self.node_idx,
175                    self.block_number,
176                    status,
177                    expected
178                )),
179            }
180        })
181    }
182}
183
184/// Action that sends multiple blocks to a node in a specific order.
185#[derive(Debug)]
186pub struct SendNewPayloads<Engine>
187where
188    Engine: EngineTypes,
189{
190    /// The node index to send to
191    target_node: Option<usize>,
192    /// The source node to get the blocks from
193    source_node: Option<usize>,
194    /// The starting block number
195    start_block: Option<u64>,
196    /// The total number of blocks to send
197    total_blocks: Option<u64>,
198    /// Whether to send in reverse order
199    reverse_order: bool,
200    /// Custom block numbers to send (if not using `start_block` + `total_blocks`)
201    custom_block_numbers: Option<Vec<u64>>,
202    _phantom: PhantomData<Engine>,
203}
204
205impl<Engine> SendNewPayloads<Engine>
206where
207    Engine: EngineTypes,
208{
209    /// Create a new `SendNewPayloads` action builder
210    pub fn new() -> Self {
211        Self {
212            target_node: None,
213            source_node: None,
214            start_block: None,
215            total_blocks: None,
216            reverse_order: false,
217            custom_block_numbers: None,
218            _phantom: Default::default(),
219        }
220    }
221
222    /// Set the target node index
223    pub const fn with_target_node(mut self, node_idx: usize) -> Self {
224        self.target_node = Some(node_idx);
225        self
226    }
227
228    /// Set the source node index
229    pub const fn with_source_node(mut self, node_idx: usize) -> Self {
230        self.source_node = Some(node_idx);
231        self
232    }
233
234    /// Set the starting block number
235    pub const fn with_start_block(mut self, block_num: u64) -> Self {
236        self.start_block = Some(block_num);
237        self
238    }
239
240    /// Set the total number of blocks to send
241    pub const fn with_total_blocks(mut self, count: u64) -> Self {
242        self.total_blocks = Some(count);
243        self
244    }
245
246    /// Send blocks in reverse order (useful for testing buffering)
247    pub const fn in_reverse_order(mut self) -> Self {
248        self.reverse_order = true;
249        self
250    }
251
252    /// Set custom block numbers to send
253    pub fn with_block_numbers(mut self, block_numbers: Vec<u64>) -> Self {
254        self.custom_block_numbers = Some(block_numbers);
255        self
256    }
257}
258
259impl<Engine> Default for SendNewPayloads<Engine>
260where
261    Engine: EngineTypes,
262{
263    fn default() -> Self {
264        Self::new()
265    }
266}
267
268impl<Engine> Action<Engine> for SendNewPayloads<Engine>
269where
270    Engine: EngineTypes + PayloadTypes,
271{
272    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
273        Box::pin(async move {
274            // Validate required fields
275            let target_node =
276                self.target_node.ok_or_else(|| eyre::eyre!("Target node not specified"))?;
277            let source_node =
278                self.source_node.ok_or_else(|| eyre::eyre!("Source node not specified"))?;
279
280            // Determine block numbers to send
281            let block_numbers = if let Some(custom_numbers) = &self.custom_block_numbers {
282                custom_numbers.clone()
283            } else {
284                let start =
285                    self.start_block.ok_or_else(|| eyre::eyre!("Start block not specified"))?;
286                let count =
287                    self.total_blocks.ok_or_else(|| eyre::eyre!("Total blocks not specified"))?;
288
289                if self.reverse_order {
290                    // Send blocks in reverse order (e.g., for count=2, start=1: [2, 1])
291                    (0..count).map(|i| start + count - 1 - i).collect()
292                } else {
293                    // Send blocks in normal order
294                    (0..count).map(|i| start + i).collect()
295                }
296            };
297
298            for &block_number in &block_numbers {
299                // For the first block in reverse order, expect buffering
300                // For subsequent blocks, they might connect immediately
301                let expected_status =
302                    if self.reverse_order && block_number == *block_numbers.first().unwrap() {
303                        ExpectedPayloadStatus::SyncingOrAccepted
304                    } else {
305                        ExpectedPayloadStatus::Valid
306                    };
307
308                let mut action = SendNewPayload::<Engine>::new(
309                    target_node,
310                    block_number,
311                    source_node,
312                    expected_status,
313                );
314
315                action.execute(env).await?;
316            }
317
318            Ok(())
319        })
320    }
321}
322
323/// Helper function to convert a block to `ExecutionPayloadV3`
324fn block_to_payload_v3(block: Block) -> ExecutionPayloadV3 {
325    use alloy_primitives::U256;
326
327    ExecutionPayloadV3 {
328        payload_inner: ExecutionPayloadV2 {
329            payload_inner: ExecutionPayloadV1 {
330                parent_hash: block.header.inner.parent_hash,
331                fee_recipient: block.header.inner.beneficiary,
332                state_root: block.header.inner.state_root,
333                receipts_root: block.header.inner.receipts_root,
334                logs_bloom: block.header.inner.logs_bloom,
335                prev_randao: block.header.inner.mix_hash,
336                block_number: block.header.inner.number,
337                gas_limit: block.header.inner.gas_limit,
338                gas_used: block.header.inner.gas_used,
339                timestamp: block.header.inner.timestamp,
340                extra_data: block.header.inner.extra_data.clone(),
341                base_fee_per_gas: U256::from(block.header.inner.base_fee_per_gas.unwrap_or(0)),
342                block_hash: block.header.hash,
343                transactions: vec![], // No transactions needed for buffering tests
344            },
345            withdrawals: block.withdrawals.unwrap_or_default().to_vec(),
346        },
347        blob_gas_used: block.header.inner.blob_gas_used.unwrap_or(0),
348        excess_blob_gas: block.header.inner.excess_blob_gas.unwrap_or(0),
349    }
350}