1use crate::{network::NetworkTestContext, payload::PayloadTestContext, rpc::RpcTestContext};
2use alloy_consensus::{transaction::TxHashRef, BlockHeader};
3use alloy_eips::BlockId;
4use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealable, B256};
5use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV5, ForkchoiceState};
6use alloy_rpc_types_eth::BlockNumberOrTag;
7use eyre::Ok;
8use futures_util::Future;
9use jsonrpsee::{core::client::ClientT, http_client::HttpClient};
10use reth_chainspec::EthereumHardforks;
11use reth_network_api::test_utils::PeersHandleProvider;
12use reth_node_api::{Block, BlockBody, BlockTy, FullNodeComponents, PayloadTypes, PrimitivesTy};
13use reth_node_builder::{rpc::RethRpcAddOns, FullNode, NodeTypes};
14
15use reth_payload_primitives::BuiltPayload;
16use reth_provider::{
17 BlockReader, BlockReaderIdExt, CanonStateNotificationStream, CanonStateSubscriptions,
18 HeaderProvider, StageCheckpointReader,
19};
20use reth_rpc_api::TestingBuildBlockRequestV1;
21use reth_rpc_builder::auth::AuthServerHandle;
22use reth_rpc_eth_api::helpers::{EthApiSpec, EthTransactions, TraceExt};
23use reth_stages_types::StageId;
24use std::pin::Pin;
25use tokio_stream::StreamExt;
26use url::Url;
27
28#[expect(missing_debug_implementations)]
30pub struct NodeTestContext<Node, AddOns>
31where
32 Node: FullNodeComponents,
33 AddOns: RethRpcAddOns<Node>,
34{
35 pub inner: FullNode<Node, AddOns>,
37 pub payload: PayloadTestContext<<Node::Types as NodeTypes>::Payload>,
39 pub network: NetworkTestContext<Node::Network>,
41 pub rpc: RpcTestContext<Node, AddOns::EthApi>,
43 pub canonical_stream: CanonStateNotificationStream<PrimitivesTy<Node::Types>>,
45}
46
47impl<Node, Payload, AddOns> NodeTestContext<Node, AddOns>
48where
49 Payload: PayloadTypes,
50 Node: FullNodeComponents,
51 Node::Types: NodeTypes<ChainSpec: EthereumHardforks, Payload = Payload>,
52 Node::Network: PeersHandleProvider,
53 AddOns: RethRpcAddOns<Node>,
54{
55 pub async fn new(
57 node: FullNode<Node, AddOns>,
58 attributes_generator: impl Fn(u64) -> Payload::PayloadAttributes + Send + Sync + 'static,
59 ) -> eyre::Result<Self> {
60 Ok(Self {
61 inner: node.clone(),
62 payload: PayloadTestContext::new(
63 node.payload_builder_handle.clone(),
64 attributes_generator,
65 )
66 .await?,
67 network: NetworkTestContext::new(node.network.clone()),
68 rpc: RpcTestContext { inner: node.add_ons_handle.rpc_registry },
69 canonical_stream: node.provider.canonical_state_stream(),
70 })
71 }
72
73 pub async fn connect(&mut self, node: &mut Self) {
75 self.network.add_peer(node.network.record()).await;
76 node.network.next_session_established().await;
77 self.network.next_session_established().await;
78 }
79
80 pub async fn advance(
84 &mut self,
85 length: u64,
86 tx_generator: impl Fn(u64) -> Pin<Box<dyn Future<Output = Bytes>>>,
87 ) -> eyre::Result<Vec<Payload::BuiltPayload>>
88 where
89 AddOns::EthApi: EthApiSpec<Provider: BlockReader<Block = BlockTy<Node::Types>>>
90 + EthTransactions
91 + TraceExt,
92 {
93 let mut chain = Vec::with_capacity(length as usize);
94 for i in 0..length {
95 let raw_tx = tx_generator(i).await;
96 let tx_hash = self.rpc.inject_tx(raw_tx).await?;
97 let payload = self.advance_block().await?;
98 let block_hash = payload.block().hash();
99 let block_number = payload.block().number();
100 self.assert_new_block(tx_hash, block_hash, block_number).await?;
101 chain.push(payload);
102 }
103 Ok(chain)
104 }
105
106 pub fn current_forkchoice_state(&self) -> eyre::Result<ForkchoiceState> {
108 let latest_header =
109 self.inner.provider.sealed_header_by_number_or_tag(BlockNumberOrTag::Latest)?.unwrap();
110
111 if latest_header.number() == 0 {
112 return Ok(ForkchoiceState::same_hash(latest_header.hash()));
113 }
114
115 Ok(ForkchoiceState {
116 head_block_hash: latest_header.hash(),
117 safe_block_hash: self
118 .inner
119 .provider
120 .sealed_header_by_number_or_tag(BlockNumberOrTag::Safe)?
121 .unwrap()
122 .hash(),
123 finalized_block_hash: self
124 .inner
125 .provider
126 .sealed_header_by_number_or_tag(BlockNumberOrTag::Finalized)?
127 .unwrap()
128 .hash(),
129 })
130 }
131
132 pub async fn new_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
137 let eth_attr = self.payload.next_attributes();
138 let payload_id = self
139 .inner
140 .add_ons_handle
141 .beacon_engine_handle
142 .fork_choice_updated(self.current_forkchoice_state()?, Some(eth_attr.clone()))
143 .await?
144 .payload_id
145 .unwrap();
146 self.payload.expect_attr_event(eth_attr).await?;
148 self.payload.wait_for_built_payload(payload_id).await;
150 Ok(self.payload.expect_built_payload().await?)
152 }
153
154 pub async fn build_and_submit_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
156 let payload = self.new_payload().await?;
157
158 self.submit_payload(payload.clone()).await?;
159
160 Ok(payload)
161 }
162
163 pub async fn advance_block(&mut self) -> eyre::Result<Payload::BuiltPayload> {
165 let payload = self.build_and_submit_payload().await?;
166
167 self.update_forkchoice(payload.block().hash(), payload.block().hash()).await?;
169
170 Ok(payload)
171 }
172
173 pub async fn wait_block(
175 &self,
176 number: BlockNumber,
177 expected_block_hash: BlockHash,
178 wait_finish_checkpoint: bool,
179 ) -> eyre::Result<()> {
180 let mut check = !wait_finish_checkpoint;
181 loop {
182 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
183
184 if !check &&
185 wait_finish_checkpoint &&
186 let Some(checkpoint) =
187 self.inner.provider.get_stage_checkpoint(StageId::Finish)? &&
188 checkpoint.block_number >= number
189 {
190 check = true
191 }
192
193 if check {
194 if let Some(latest_header) = self.inner.provider.header_by_number(number)? {
195 assert_eq!(latest_header.hash_slow(), expected_block_hash);
196 break
197 }
198 assert!(
199 !wait_finish_checkpoint,
200 "Finish checkpoint matches, but could not fetch block."
201 );
202 }
203 }
204 Ok(())
205 }
206
207 pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> {
209 loop {
210 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
211 if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? &&
212 checkpoint.block_number == number
213 {
214 break
215 }
216 }
217 Ok(())
218 }
219
220 pub async fn assert_new_block(
225 &mut self,
226 tip_tx_hash: B256,
227 block_hash: B256,
228 block_number: BlockNumber,
229 ) -> eyre::Result<()> {
230 let head = self.canonical_stream.next().await.unwrap();
233 let tx = head.tip().body().transactions().first();
234 assert_eq!(tx.unwrap().tx_hash().as_slice(), tip_tx_hash.as_slice());
235
236 loop {
237 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
239 if let Some(latest_block) =
240 self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)? &&
241 latest_block.header().number() == block_number
242 {
243 assert_eq!(latest_block.header().hash_slow(), block_hash);
246 break
247 }
248 }
249 Ok(())
250 }
251
252 pub fn block_hash(&self, number: u64) -> BlockHash {
254 self.inner
255 .provider
256 .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(number))
257 .unwrap()
258 .unwrap()
259 .hash()
260 }
261
262 pub async fn sync_to(&self, block: BlockHash) -> eyre::Result<()> {
264 let start = std::time::Instant::now();
265
266 while self
267 .inner
268 .provider
269 .sealed_header_by_id(BlockId::Number(BlockNumberOrTag::Latest))?
270 .is_none_or(|h| h.hash() != block)
271 {
272 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
273 self.update_forkchoice(block, block).await?;
274
275 assert!(start.elapsed() <= std::time::Duration::from_secs(40), "timed out");
276 }
277
278 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
282
283 Ok(())
284 }
285
286 pub async fn update_forkchoice(&self, current_head: B256, new_head: B256) -> eyre::Result<()> {
288 self.inner
289 .add_ons_handle
290 .beacon_engine_handle
291 .fork_choice_updated(
292 ForkchoiceState {
293 head_block_hash: new_head,
294 safe_block_hash: current_head,
295 finalized_block_hash: current_head,
296 },
297 None,
298 )
299 .await?;
300
301 Ok(())
302 }
303
304 pub async fn update_optimistic_forkchoice(&self, hash: B256) -> eyre::Result<()> {
306 self.update_forkchoice(B256::ZERO, hash).await
307 }
308
309 pub async fn submit_payload(&self, payload: Payload::BuiltPayload) -> eyre::Result<B256> {
311 let block_hash = payload.block().hash();
312 self.inner
313 .add_ons_handle
314 .beacon_engine_handle
315 .new_payload(Payload::block_to_payload(payload.block().clone()))
316 .await?;
317
318 Ok(block_hash)
319 }
320
321 pub fn rpc_url(&self) -> Url {
323 let addr = self.inner.rpc_server_handle().http_local_addr().unwrap();
324 format!("http://{addr}").parse().unwrap()
325 }
326
327 pub fn rpc_client(&self) -> Option<HttpClient> {
329 self.inner.rpc_server_handle().http_client()
330 }
331
332 pub fn auth_server_handle(&self) -> AuthServerHandle {
334 self.inner.auth_server_handle().clone()
335 }
336
337 pub fn to_node_client(&self) -> eyre::Result<crate::testsuite::NodeClient<Payload>> {
343 let rpc = self
344 .rpc_client()
345 .ok_or_else(|| eyre::eyre!("Failed to create HTTP RPC client for node"))?;
346 let auth = self.auth_server_handle();
347 let url = self.rpc_url();
348 let beacon_handle = self.inner.add_ons_handle.beacon_engine_handle.clone();
349
350 Ok(crate::testsuite::NodeClient::new_with_beacon_engine(rpc, auth, url, beacon_handle))
351 }
352
353 pub async fn testing_build_block_v1(
358 &self,
359 request: TestingBuildBlockRequestV1,
360 ) -> eyre::Result<ExecutionPayloadEnvelopeV5> {
361 let client =
362 self.rpc_client().ok_or_else(|| eyre::eyre!("HTTP RPC client not available"))?;
363
364 let res: ExecutionPayloadEnvelopeV5 =
365 client.request("testing_buildBlockV1", [request]).await?;
366 eyre::Ok(res)
367 }
368}