reth_e2e_test_utils/testsuite/actions/
engine_api.rs1use crate::testsuite::{Action, Environment};
4use alloy_primitives::B256;
5use alloy_rpc_types_engine::{
6 ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3, PayloadStatusEnum,
7};
8use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction, TransactionRequest};
9use eyre::Result;
10use futures_util::future::BoxFuture;
11use reth_node_api::{EngineTypes, PayloadTypes};
12use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
13use std::marker::PhantomData;
14use tracing::debug;
15
16#[derive(Debug)]
18pub struct SendNewPayload<Engine>
19where
20 Engine: EngineTypes,
21{
22 pub node_idx: usize,
24 pub block_number: u64,
26 pub source_node_idx: usize,
28 pub expected_status: ExpectedPayloadStatus,
30 _phantom: PhantomData<Engine>,
31}
32
33#[derive(Debug, Clone)]
35pub enum ExpectedPayloadStatus {
36 Valid,
38 Invalid,
40 SyncingOrAccepted,
42}
43
44impl<Engine> SendNewPayload<Engine>
45where
46 Engine: EngineTypes,
47{
48 pub fn new(
50 node_idx: usize,
51 block_number: u64,
52 source_node_idx: usize,
53 expected_status: ExpectedPayloadStatus,
54 ) -> Self {
55 Self {
56 node_idx,
57 block_number,
58 source_node_idx,
59 expected_status,
60 _phantom: Default::default(),
61 }
62 }
63}
64
65impl<Engine> Action<Engine> for SendNewPayload<Engine>
66where
67 Engine: EngineTypes + PayloadTypes,
68{
69 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
70 Box::pin(async move {
71 if self.node_idx >= env.node_clients.len() {
72 return Err(eyre::eyre!("Target node index out of bounds: {}", self.node_idx));
73 }
74 if self.source_node_idx >= env.node_clients.len() {
75 return Err(eyre::eyre!(
76 "Source node index out of bounds: {}",
77 self.source_node_idx
78 ));
79 }
80
81 let source_rpc = &env.node_clients[self.source_node_idx].rpc;
83 let mut block = None;
84 let mut retries = 0;
85 const MAX_RETRIES: u32 = 5;
86
87 while retries < MAX_RETRIES {
88 match EthApiClient::<TransactionRequest, Transaction, Block, Receipt, Header>::block_by_number(
89 source_rpc,
90 alloy_eips::BlockNumberOrTag::Number(self.block_number),
91 true, )
93 .await
94 {
95 Ok(Some(b)) => {
96 block = Some(b);
97 break;
98 }
99 Ok(None) => {
100 debug!(
101 "Block {} not found on source node {} (attempt {}/{})",
102 self.block_number,
103 self.source_node_idx,
104 retries + 1,
105 MAX_RETRIES
106 );
107 retries += 1;
108 if retries < MAX_RETRIES {
109 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
110 }
111 }
112 Err(e) => return Err(e.into()),
113 }
114 }
115
116 let block = block.ok_or_else(|| {
117 eyre::eyre!(
118 "Block {} not found on source node {} after {} retries",
119 self.block_number,
120 self.source_node_idx,
121 MAX_RETRIES
122 )
123 })?;
124
125 let payload = block_to_payload_v3(block.clone());
127
128 let target_engine = env.node_clients[self.node_idx].engine.http_client();
130 let result = EngineApiClient::<Engine>::new_payload_v3(
131 &target_engine,
132 payload,
133 vec![],
134 B256::ZERO, )
136 .await?;
137
138 debug!(
139 "Node {}: new_payload for block {} response - status: {:?}, latest_valid_hash: {:?}",
140 self.node_idx, self.block_number, result.status, result.latest_valid_hash
141 );
142
143 match (&result.status, &self.expected_status) {
145 (PayloadStatusEnum::Valid, ExpectedPayloadStatus::Valid) => {
146 debug!(
147 "Node {}: Block {} marked as VALID as expected",
148 self.node_idx, self.block_number
149 );
150 Ok(())
151 }
152 (
153 PayloadStatusEnum::Invalid { validation_error },
154 ExpectedPayloadStatus::Invalid,
155 ) => {
156 debug!(
157 "Node {}: Block {} marked as INVALID as expected: {:?}",
158 self.node_idx, self.block_number, validation_error
159 );
160 Ok(())
161 }
162 (
163 PayloadStatusEnum::Syncing | PayloadStatusEnum::Accepted,
164 ExpectedPayloadStatus::SyncingOrAccepted,
165 ) => {
166 debug!(
167 "Node {}: Block {} marked as SYNCING/ACCEPTED as expected (buffered)",
168 self.node_idx, self.block_number
169 );
170 Ok(())
171 }
172 (status, expected) => Err(eyre::eyre!(
173 "Node {}: Unexpected payload status for block {}. Got {:?}, expected {:?}",
174 self.node_idx,
175 self.block_number,
176 status,
177 expected
178 )),
179 }
180 })
181 }
182}
183
184#[derive(Debug)]
186pub struct SendNewPayloads<Engine>
187where
188 Engine: EngineTypes,
189{
190 target_node: Option<usize>,
192 source_node: Option<usize>,
194 start_block: Option<u64>,
196 total_blocks: Option<u64>,
198 reverse_order: bool,
200 custom_block_numbers: Option<Vec<u64>>,
202 _phantom: PhantomData<Engine>,
203}
204
205impl<Engine> SendNewPayloads<Engine>
206where
207 Engine: EngineTypes,
208{
209 pub fn new() -> Self {
211 Self {
212 target_node: None,
213 source_node: None,
214 start_block: None,
215 total_blocks: None,
216 reverse_order: false,
217 custom_block_numbers: None,
218 _phantom: Default::default(),
219 }
220 }
221
222 pub const fn with_target_node(mut self, node_idx: usize) -> Self {
224 self.target_node = Some(node_idx);
225 self
226 }
227
228 pub const fn with_source_node(mut self, node_idx: usize) -> Self {
230 self.source_node = Some(node_idx);
231 self
232 }
233
234 pub const fn with_start_block(mut self, block_num: u64) -> Self {
236 self.start_block = Some(block_num);
237 self
238 }
239
240 pub const fn with_total_blocks(mut self, count: u64) -> Self {
242 self.total_blocks = Some(count);
243 self
244 }
245
246 pub const fn in_reverse_order(mut self) -> Self {
248 self.reverse_order = true;
249 self
250 }
251
252 pub fn with_block_numbers(mut self, block_numbers: Vec<u64>) -> Self {
254 self.custom_block_numbers = Some(block_numbers);
255 self
256 }
257}
258
259impl<Engine> Default for SendNewPayloads<Engine>
260where
261 Engine: EngineTypes,
262{
263 fn default() -> Self {
264 Self::new()
265 }
266}
267
268impl<Engine> Action<Engine> for SendNewPayloads<Engine>
269where
270 Engine: EngineTypes + PayloadTypes,
271{
272 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
273 Box::pin(async move {
274 let target_node =
276 self.target_node.ok_or_else(|| eyre::eyre!("Target node not specified"))?;
277 let source_node =
278 self.source_node.ok_or_else(|| eyre::eyre!("Source node not specified"))?;
279
280 let block_numbers = if let Some(custom_numbers) = &self.custom_block_numbers {
282 custom_numbers.clone()
283 } else {
284 let start =
285 self.start_block.ok_or_else(|| eyre::eyre!("Start block not specified"))?;
286 let count =
287 self.total_blocks.ok_or_else(|| eyre::eyre!("Total blocks not specified"))?;
288
289 if self.reverse_order {
290 (0..count).map(|i| start + count - 1 - i).collect()
292 } else {
293 (0..count).map(|i| start + i).collect()
295 }
296 };
297
298 for &block_number in &block_numbers {
299 let expected_status =
302 if self.reverse_order && block_number == *block_numbers.first().unwrap() {
303 ExpectedPayloadStatus::SyncingOrAccepted
304 } else {
305 ExpectedPayloadStatus::Valid
306 };
307
308 let mut action = SendNewPayload::<Engine>::new(
309 target_node,
310 block_number,
311 source_node,
312 expected_status,
313 );
314
315 action.execute(env).await?;
316 }
317
318 Ok(())
319 })
320 }
321}
322
323fn block_to_payload_v3(block: Block) -> ExecutionPayloadV3 {
325 use alloy_primitives::U256;
326
327 ExecutionPayloadV3 {
328 payload_inner: ExecutionPayloadV2 {
329 payload_inner: ExecutionPayloadV1 {
330 parent_hash: block.header.inner.parent_hash,
331 fee_recipient: block.header.inner.beneficiary,
332 state_root: block.header.inner.state_root,
333 receipts_root: block.header.inner.receipts_root,
334 logs_bloom: block.header.inner.logs_bloom,
335 prev_randao: block.header.inner.mix_hash,
336 block_number: block.header.inner.number,
337 gas_limit: block.header.inner.gas_limit,
338 gas_used: block.header.inner.gas_used,
339 timestamp: block.header.inner.timestamp,
340 extra_data: block.header.inner.extra_data.clone(),
341 base_fee_per_gas: U256::from(block.header.inner.base_fee_per_gas.unwrap_or(0)),
342 block_hash: block.header.hash,
343 transactions: vec![], },
345 withdrawals: block.withdrawals.unwrap_or_default().to_vec(),
346 },
347 blob_gas_used: block.header.inner.blob_gas_used.unwrap_or(0),
348 excess_blob_gas: block.header.inner.excess_blob_gas.unwrap_or(0),
349 }
350}