reth_e2e_test_utils/testsuite/actions/
engine_api.rs1use crate::testsuite::{Action, Environment};
4use alloy_primitives::B256;
5use alloy_rpc_types_engine::{ExecutionPayloadV3, PayloadStatusEnum};
6use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction, TransactionRequest};
7use eyre::Result;
8use futures_util::future::BoxFuture;
9use reth_ethereum_primitives::TransactionSigned;
10use reth_node_api::{EngineTypes, PayloadTypes};
11use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
12use std::marker::PhantomData;
13use tracing::debug;
14
15#[derive(Debug)]
17pub struct SendNewPayload<Engine>
18where
19 Engine: EngineTypes,
20{
21 pub node_idx: usize,
23 pub block_number: u64,
25 pub source_node_idx: usize,
27 pub expected_status: ExpectedPayloadStatus,
29 _phantom: PhantomData<Engine>,
30}
31
32#[derive(Debug, Clone)]
34pub enum ExpectedPayloadStatus {
35 Valid,
37 Invalid,
39 SyncingOrAccepted,
41}
42
43impl<Engine> SendNewPayload<Engine>
44where
45 Engine: EngineTypes,
46{
47 pub fn new(
49 node_idx: usize,
50 block_number: u64,
51 source_node_idx: usize,
52 expected_status: ExpectedPayloadStatus,
53 ) -> Self {
54 Self {
55 node_idx,
56 block_number,
57 source_node_idx,
58 expected_status,
59 _phantom: Default::default(),
60 }
61 }
62}
63
64impl<Engine> Action<Engine> for SendNewPayload<Engine>
65where
66 Engine: EngineTypes + PayloadTypes,
67{
68 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
69 Box::pin(async move {
70 if self.node_idx >= env.node_clients.len() {
71 return Err(eyre::eyre!("Target node index out of bounds: {}", self.node_idx));
72 }
73 if self.source_node_idx >= env.node_clients.len() {
74 return Err(eyre::eyre!(
75 "Source node index out of bounds: {}",
76 self.source_node_idx
77 ));
78 }
79
80 let source_rpc = &env.node_clients[self.source_node_idx].rpc;
82 let mut block = None;
83 let mut retries = 0;
84 const MAX_RETRIES: u32 = 5;
85
86 while retries < MAX_RETRIES {
87 match EthApiClient::<
88 TransactionRequest,
89 Transaction,
90 Block,
91 Receipt,
92 Header,
93 TransactionSigned,
94 >::block_by_number(
95 source_rpc,
96 alloy_eips::BlockNumberOrTag::Number(self.block_number),
97 true, )
99 .await
100 {
101 Ok(Some(b)) => {
102 block = Some(b);
103 break;
104 }
105 Ok(None) => {
106 debug!(
107 "Block {} not found on source node {} (attempt {}/{})",
108 self.block_number,
109 self.source_node_idx,
110 retries + 1,
111 MAX_RETRIES
112 );
113 retries += 1;
114 if retries < MAX_RETRIES {
115 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
116 }
117 }
118 Err(e) => return Err(e.into()),
119 }
120 }
121
122 let block = block.ok_or_else(|| {
123 eyre::eyre!(
124 "Block {} not found on source node {} after {} retries",
125 self.block_number,
126 self.source_node_idx,
127 MAX_RETRIES
128 )
129 })?;
130
131 let payload = ExecutionPayloadV3::from_block_unchecked(
133 block.hash(),
134 &block.map_transactions(|tx| tx.inner).into_consensus(),
135 );
136
137 let target_engine = env.node_clients[self.node_idx].engine.http_client();
139 let result = EngineApiClient::<Engine>::new_payload_v3(
140 &target_engine,
141 payload,
142 vec![],
143 B256::ZERO, )
145 .await?;
146
147 debug!(
148 "Node {}: new_payload for block {} response - status: {:?}, latest_valid_hash: {:?}",
149 self.node_idx, self.block_number, result.status, result.latest_valid_hash
150 );
151
152 match (&result.status, &self.expected_status) {
154 (PayloadStatusEnum::Valid, ExpectedPayloadStatus::Valid) => {
155 debug!(
156 "Node {}: Block {} marked as VALID as expected",
157 self.node_idx, self.block_number
158 );
159 Ok(())
160 }
161 (
162 PayloadStatusEnum::Invalid { validation_error },
163 ExpectedPayloadStatus::Invalid,
164 ) => {
165 debug!(
166 "Node {}: Block {} marked as INVALID as expected: {:?}",
167 self.node_idx, self.block_number, validation_error
168 );
169 Ok(())
170 }
171 (
172 PayloadStatusEnum::Syncing | PayloadStatusEnum::Accepted,
173 ExpectedPayloadStatus::SyncingOrAccepted,
174 ) => {
175 debug!(
176 "Node {}: Block {} marked as SYNCING/ACCEPTED as expected (buffered)",
177 self.node_idx, self.block_number
178 );
179 Ok(())
180 }
181 (status, expected) => Err(eyre::eyre!(
182 "Node {}: Unexpected payload status for block {}. Got {:?}, expected {:?}",
183 self.node_idx,
184 self.block_number,
185 status,
186 expected
187 )),
188 }
189 })
190 }
191}
192
193#[derive(Debug)]
195pub struct SendNewPayloads<Engine>
196where
197 Engine: EngineTypes,
198{
199 target_node: Option<usize>,
201 source_node: Option<usize>,
203 start_block: Option<u64>,
205 total_blocks: Option<u64>,
207 reverse_order: bool,
209 custom_block_numbers: Option<Vec<u64>>,
211 _phantom: PhantomData<Engine>,
212}
213
214impl<Engine> SendNewPayloads<Engine>
215where
216 Engine: EngineTypes,
217{
218 pub fn new() -> Self {
220 Self {
221 target_node: None,
222 source_node: None,
223 start_block: None,
224 total_blocks: None,
225 reverse_order: false,
226 custom_block_numbers: None,
227 _phantom: Default::default(),
228 }
229 }
230
231 pub const fn with_target_node(mut self, node_idx: usize) -> Self {
233 self.target_node = Some(node_idx);
234 self
235 }
236
237 pub const fn with_source_node(mut self, node_idx: usize) -> Self {
239 self.source_node = Some(node_idx);
240 self
241 }
242
243 pub const fn with_start_block(mut self, block_num: u64) -> Self {
245 self.start_block = Some(block_num);
246 self
247 }
248
249 pub const fn with_total_blocks(mut self, count: u64) -> Self {
251 self.total_blocks = Some(count);
252 self
253 }
254
255 pub const fn in_reverse_order(mut self) -> Self {
257 self.reverse_order = true;
258 self
259 }
260
261 pub fn with_block_numbers(mut self, block_numbers: Vec<u64>) -> Self {
263 self.custom_block_numbers = Some(block_numbers);
264 self
265 }
266}
267
268impl<Engine> Default for SendNewPayloads<Engine>
269where
270 Engine: EngineTypes,
271{
272 fn default() -> Self {
273 Self::new()
274 }
275}
276
277impl<Engine> Action<Engine> for SendNewPayloads<Engine>
278where
279 Engine: EngineTypes + PayloadTypes,
280{
281 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
282 Box::pin(async move {
283 let target_node =
285 self.target_node.ok_or_else(|| eyre::eyre!("Target node not specified"))?;
286 let source_node =
287 self.source_node.ok_or_else(|| eyre::eyre!("Source node not specified"))?;
288
289 let block_numbers = if let Some(custom_numbers) = &self.custom_block_numbers {
291 custom_numbers.clone()
292 } else {
293 let start =
294 self.start_block.ok_or_else(|| eyre::eyre!("Start block not specified"))?;
295 let count =
296 self.total_blocks.ok_or_else(|| eyre::eyre!("Total blocks not specified"))?;
297
298 if self.reverse_order {
299 (0..count).map(|i| start + count - 1 - i).collect()
301 } else {
302 (0..count).map(|i| start + i).collect()
304 }
305 };
306
307 for &block_number in &block_numbers {
308 let expected_status =
311 if self.reverse_order && block_number == *block_numbers.first().unwrap() {
312 ExpectedPayloadStatus::SyncingOrAccepted
313 } else {
314 ExpectedPayloadStatus::Valid
315 };
316
317 let mut action = SendNewPayload::<Engine>::new(
318 target_node,
319 block_number,
320 source_node,
321 expected_status,
322 );
323
324 action.execute(env).await?;
325 }
326
327 Ok(())
328 })
329 }
330}