1use crate::{network::NetworkTestContext, payload::PayloadTestContext, rpc::RpcTestContext};
2use alloy_consensus::{transaction::TxHashRef, BlockHeader};
3use alloy_eips::BlockId;
4use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealable, B256};
5use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV5, ForkchoiceState};
6use alloy_rpc_types_eth::BlockNumberOrTag;
7use eyre::Ok;
8use futures_util::Future;
9use jsonrpsee::{core::client::ClientT, http_client::HttpClient};
10use reth_chainspec::EthereumHardforks;
11use reth_network_api::test_utils::PeersHandleProvider;
12use reth_node_api::{
13 Block, BlockBody, BlockTy, EngineApiMessageVersion, FullNodeComponents, PayloadTypes,
14 PrimitivesTy,
15};
16use reth_node_builder::{rpc::RethRpcAddOns, FullNode, NodeTypes};
17
18use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes};
19use reth_provider::{
20 BlockReader, BlockReaderIdExt, CanonStateNotificationStream, CanonStateSubscriptions,
21 HeaderProvider, StageCheckpointReader,
22};
23use reth_rpc_api::TestingBuildBlockRequestV1;
24use reth_rpc_builder::auth::AuthServerHandle;
25use reth_rpc_eth_api::helpers::{EthApiSpec, EthTransactions, TraceExt};
26use reth_stages_types::StageId;
27use std::pin::Pin;
28use tokio_stream::StreamExt;
29use url::Url;
30
31#[expect(missing_debug_implementations)]
33pub struct NodeTestContext<Node, AddOns>
34where
35 Node: FullNodeComponents,
36 AddOns: RethRpcAddOns<Node>,
37{
38 pub inner: FullNode<Node, AddOns>,
40 pub payload: PayloadTestContext<<Node::Types as NodeTypes>::Payload>,
42 pub network: NetworkTestContext<Node::Network>,
44 pub rpc: RpcTestContext<Node, AddOns::EthApi>,
46 pub canonical_stream: CanonStateNotificationStream<PrimitivesTy<Node::Types>>,
48}
49
50impl<Node, Payload, AddOns> NodeTestContext<Node, AddOns>
51where
52 Payload: PayloadTypes,
53 Node: FullNodeComponents,
54 Node::Types: NodeTypes<ChainSpec: EthereumHardforks, Payload = Payload>,
55 Node::Network: PeersHandleProvider,
56 AddOns: RethRpcAddOns<Node>,
57{
58 pub async fn new(
60 node: FullNode<Node, AddOns>,
61 attributes_generator: impl Fn(u64) -> Payload::PayloadBuilderAttributes + Send + Sync + 'static,
62 ) -> eyre::Result<Self> {
63 Ok(Self {
64 inner: node.clone(),
65 payload: PayloadTestContext::new(
66 node.payload_builder_handle.clone(),
67 attributes_generator,
68 )
69 .await?,
70 network: NetworkTestContext::new(node.network.clone()),
71 rpc: RpcTestContext { inner: node.add_ons_handle.rpc_registry },
72 canonical_stream: node.provider.canonical_state_stream(),
73 })
74 }
75
76 pub async fn connect(&mut self, node: &mut Self) {
78 self.network.add_peer(node.network.record()).await;
79 node.network.next_session_established().await;
80 self.network.next_session_established().await;
81 }
82
83 pub async fn advance(
87 &mut self,
88 length: u64,
89 tx_generator: impl Fn(u64) -> Pin<Box<dyn Future<Output = Bytes>>>,
90 ) -> eyre::Result<Vec<Payload::BuiltPayload>>
91 where
92 AddOns::EthApi: EthApiSpec<Provider: BlockReader<Block = BlockTy<Node::Types>>>
93 + EthTransactions
94 + TraceExt,
95 {
96 let mut chain = Vec::with_capacity(length as usize);
97 for i in 0..length {
98 let raw_tx = tx_generator(i).await;
99 let tx_hash = self.rpc.inject_tx(raw_tx).await?;
100 let payload = self.advance_block().await?;
101 let block_hash = payload.block().hash();
102 let block_number = payload.block().number();
103 self.assert_new_block(tx_hash, block_hash, block_number).await?;
104 chain.push(payload);
105 }
106 Ok(chain)
107 }
108
109 pub async fn new_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
114 let eth_attr = self.payload.new_payload().await.unwrap();
116 self.payload.expect_attr_event(eth_attr.clone()).await?;
118 self.payload.wait_for_built_payload(eth_attr.payload_id()).await;
120 Ok(self.payload.expect_built_payload().await?)
122 }
123
124 pub async fn build_and_submit_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
126 let payload = self.new_payload().await?;
127
128 self.submit_payload(payload.clone()).await?;
129
130 Ok(payload)
131 }
132
133 pub async fn advance_block(&mut self) -> eyre::Result<Payload::BuiltPayload> {
135 let payload = self.build_and_submit_payload().await?;
136
137 self.update_forkchoice(payload.block().hash(), payload.block().hash()).await?;
139
140 Ok(payload)
141 }
142
143 pub async fn wait_block(
145 &self,
146 number: BlockNumber,
147 expected_block_hash: BlockHash,
148 wait_finish_checkpoint: bool,
149 ) -> eyre::Result<()> {
150 let mut check = !wait_finish_checkpoint;
151 loop {
152 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
153
154 if !check &&
155 wait_finish_checkpoint &&
156 let Some(checkpoint) =
157 self.inner.provider.get_stage_checkpoint(StageId::Finish)? &&
158 checkpoint.block_number >= number
159 {
160 check = true
161 }
162
163 if check {
164 if let Some(latest_header) = self.inner.provider.header_by_number(number)? {
165 assert_eq!(latest_header.hash_slow(), expected_block_hash);
166 break
167 }
168 assert!(
169 !wait_finish_checkpoint,
170 "Finish checkpoint matches, but could not fetch block."
171 );
172 }
173 }
174 Ok(())
175 }
176
177 pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> {
179 loop {
180 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
181 if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? &&
182 checkpoint.block_number == number
183 {
184 break
185 }
186 }
187 Ok(())
188 }
189
190 pub async fn assert_new_block(
195 &mut self,
196 tip_tx_hash: B256,
197 block_hash: B256,
198 block_number: BlockNumber,
199 ) -> eyre::Result<()> {
200 let head = self.canonical_stream.next().await.unwrap();
203 let tx = head.tip().body().transactions().first();
204 assert_eq!(tx.unwrap().tx_hash().as_slice(), tip_tx_hash.as_slice());
205
206 loop {
207 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
209 if let Some(latest_block) =
210 self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)? &&
211 latest_block.header().number() == block_number
212 {
213 assert_eq!(latest_block.header().hash_slow(), block_hash);
216 break
217 }
218 }
219 Ok(())
220 }
221
222 pub fn block_hash(&self, number: u64) -> BlockHash {
224 self.inner
225 .provider
226 .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(number))
227 .unwrap()
228 .unwrap()
229 .hash()
230 }
231
232 pub async fn sync_to(&self, block: BlockHash) -> eyre::Result<()> {
234 let start = std::time::Instant::now();
235
236 while self
237 .inner
238 .provider
239 .sealed_header_by_id(BlockId::Number(BlockNumberOrTag::Latest))?
240 .is_none_or(|h| h.hash() != block)
241 {
242 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
243 self.update_forkchoice(block, block).await?;
244
245 assert!(start.elapsed() <= std::time::Duration::from_secs(40), "timed out");
246 }
247
248 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
252
253 Ok(())
254 }
255
256 pub async fn update_forkchoice(&self, current_head: B256, new_head: B256) -> eyre::Result<()> {
258 self.inner
259 .add_ons_handle
260 .beacon_engine_handle
261 .fork_choice_updated(
262 ForkchoiceState {
263 head_block_hash: new_head,
264 safe_block_hash: current_head,
265 finalized_block_hash: current_head,
266 },
267 None,
268 EngineApiMessageVersion::default(),
269 )
270 .await?;
271
272 Ok(())
273 }
274
275 pub async fn update_optimistic_forkchoice(&self, hash: B256) -> eyre::Result<()> {
277 self.update_forkchoice(B256::ZERO, hash).await
278 }
279
280 pub async fn submit_payload(&self, payload: Payload::BuiltPayload) -> eyre::Result<B256> {
282 let block_hash = payload.block().hash();
283 self.inner
284 .add_ons_handle
285 .beacon_engine_handle
286 .new_payload(Payload::block_to_payload(payload.block().clone()))
287 .await?;
288
289 Ok(block_hash)
290 }
291
292 pub fn rpc_url(&self) -> Url {
294 let addr = self.inner.rpc_server_handle().http_local_addr().unwrap();
295 format!("http://{addr}").parse().unwrap()
296 }
297
298 pub fn rpc_client(&self) -> Option<HttpClient> {
300 self.inner.rpc_server_handle().http_client()
301 }
302
303 pub fn auth_server_handle(&self) -> AuthServerHandle {
305 self.inner.auth_server_handle().clone()
306 }
307
308 pub fn to_node_client(&self) -> eyre::Result<crate::testsuite::NodeClient<Payload>> {
314 let rpc = self
315 .rpc_client()
316 .ok_or_else(|| eyre::eyre!("Failed to create HTTP RPC client for node"))?;
317 let auth = self.auth_server_handle();
318 let url = self.rpc_url();
319 let beacon_handle = self.inner.add_ons_handle.beacon_engine_handle.clone();
320
321 Ok(crate::testsuite::NodeClient::new_with_beacon_engine(rpc, auth, url, beacon_handle))
322 }
323
324 pub async fn testing_build_block_v1(
329 &self,
330 request: TestingBuildBlockRequestV1,
331 ) -> eyre::Result<ExecutionPayloadEnvelopeV5> {
332 let client =
333 self.rpc_client().ok_or_else(|| eyre::eyre!("HTTP RPC client not available"))?;
334
335 let res: ExecutionPayloadEnvelopeV5 =
336 client.request("testing_buildBlockV1", [request]).await?;
337 eyre::Ok(res)
338 }
339}