1use crate::{network::NetworkTestContext, payload::PayloadTestContext, rpc::RpcTestContext};
2use alloy_consensus::{transaction::TxHashRef, BlockHeader};
3use alloy_eips::BlockId;
4use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealable, B256};
5use alloy_rpc_types_engine::ForkchoiceState;
6use alloy_rpc_types_eth::BlockNumberOrTag;
7use eyre::Ok;
8use futures_util::Future;
9use jsonrpsee::http_client::HttpClient;
10use reth_chainspec::EthereumHardforks;
11use reth_network_api::test_utils::PeersHandleProvider;
12use reth_node_api::{
13 Block, BlockBody, BlockTy, EngineApiMessageVersion, FullNodeComponents, PayloadTypes,
14 PrimitivesTy,
15};
16use reth_node_builder::{rpc::RethRpcAddOns, FullNode, NodeTypes};
17
18use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes};
19use reth_provider::{
20 BlockReader, BlockReaderIdExt, CanonStateNotificationStream, CanonStateSubscriptions,
21 HeaderProvider, StageCheckpointReader,
22};
23use reth_rpc_builder::auth::AuthServerHandle;
24use reth_rpc_eth_api::helpers::{EthApiSpec, EthTransactions, TraceExt};
25use reth_stages_types::StageId;
26use std::pin::Pin;
27use tokio_stream::StreamExt;
28use url::Url;
29
30#[expect(missing_debug_implementations)]
32pub struct NodeTestContext<Node, AddOns>
33where
34 Node: FullNodeComponents,
35 AddOns: RethRpcAddOns<Node>,
36{
37 pub inner: FullNode<Node, AddOns>,
39 pub payload: PayloadTestContext<<Node::Types as NodeTypes>::Payload>,
41 pub network: NetworkTestContext<Node::Network>,
43 pub rpc: RpcTestContext<Node, AddOns::EthApi>,
45 pub canonical_stream: CanonStateNotificationStream<PrimitivesTy<Node::Types>>,
47}
48
49impl<Node, Payload, AddOns> NodeTestContext<Node, AddOns>
50where
51 Payload: PayloadTypes,
52 Node: FullNodeComponents,
53 Node::Types: NodeTypes<ChainSpec: EthereumHardforks, Payload = Payload>,
54 Node::Network: PeersHandleProvider,
55 AddOns: RethRpcAddOns<Node>,
56{
57 pub async fn new(
59 node: FullNode<Node, AddOns>,
60 attributes_generator: impl Fn(u64) -> Payload::PayloadBuilderAttributes + Send + Sync + 'static,
61 ) -> eyre::Result<Self> {
62 Ok(Self {
63 inner: node.clone(),
64 payload: PayloadTestContext::new(
65 node.payload_builder_handle.clone(),
66 attributes_generator,
67 )
68 .await?,
69 network: NetworkTestContext::new(node.network.clone()),
70 rpc: RpcTestContext { inner: node.add_ons_handle.rpc_registry },
71 canonical_stream: node.provider.canonical_state_stream(),
72 })
73 }
74
75 pub async fn connect(&mut self, node: &mut Self) {
77 self.network.add_peer(node.network.record()).await;
78 node.network.next_session_established().await;
79 self.network.next_session_established().await;
80 }
81
82 pub async fn advance(
86 &mut self,
87 length: u64,
88 tx_generator: impl Fn(u64) -> Pin<Box<dyn Future<Output = Bytes>>>,
89 ) -> eyre::Result<Vec<Payload::BuiltPayload>>
90 where
91 AddOns::EthApi: EthApiSpec<Provider: BlockReader<Block = BlockTy<Node::Types>>>
92 + EthTransactions
93 + TraceExt,
94 {
95 let mut chain = Vec::with_capacity(length as usize);
96 for i in 0..length {
97 let raw_tx = tx_generator(i).await;
98 let tx_hash = self.rpc.inject_tx(raw_tx).await?;
99 let payload = self.advance_block().await?;
100 let block_hash = payload.block().hash();
101 let block_number = payload.block().number();
102 self.assert_new_block(tx_hash, block_hash, block_number).await?;
103 chain.push(payload);
104 }
105 Ok(chain)
106 }
107
108 pub async fn new_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
113 let eth_attr = self.payload.new_payload().await.unwrap();
115 self.payload.expect_attr_event(eth_attr.clone()).await?;
117 self.payload.wait_for_built_payload(eth_attr.payload_id()).await;
119 Ok(self.payload.expect_built_payload().await?)
121 }
122
123 pub async fn build_and_submit_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
125 let payload = self.new_payload().await?;
126
127 self.submit_payload(payload.clone()).await?;
128
129 Ok(payload)
130 }
131
132 pub async fn advance_block(&mut self) -> eyre::Result<Payload::BuiltPayload> {
134 let payload = self.build_and_submit_payload().await?;
135
136 self.update_forkchoice(payload.block().hash(), payload.block().hash()).await?;
138
139 Ok(payload)
140 }
141
142 pub async fn wait_block(
144 &self,
145 number: BlockNumber,
146 expected_block_hash: BlockHash,
147 wait_finish_checkpoint: bool,
148 ) -> eyre::Result<()> {
149 let mut check = !wait_finish_checkpoint;
150 loop {
151 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
152
153 if !check &&
154 wait_finish_checkpoint &&
155 let Some(checkpoint) =
156 self.inner.provider.get_stage_checkpoint(StageId::Finish)? &&
157 checkpoint.block_number >= number
158 {
159 check = true
160 }
161
162 if check {
163 if let Some(latest_header) = self.inner.provider.header_by_number(number)? {
164 assert_eq!(latest_header.hash_slow(), expected_block_hash);
165 break
166 }
167 assert!(
168 !wait_finish_checkpoint,
169 "Finish checkpoint matches, but could not fetch block."
170 );
171 }
172 }
173 Ok(())
174 }
175
176 pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> {
178 loop {
179 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
180 if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? &&
181 checkpoint.block_number == number
182 {
183 break
184 }
185 }
186 Ok(())
187 }
188
189 pub async fn assert_new_block(
194 &mut self,
195 tip_tx_hash: B256,
196 block_hash: B256,
197 block_number: BlockNumber,
198 ) -> eyre::Result<()> {
199 let head = self.canonical_stream.next().await.unwrap();
202 let tx = head.tip().body().transactions().first();
203 assert_eq!(tx.unwrap().tx_hash().as_slice(), tip_tx_hash.as_slice());
204
205 loop {
206 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
208 if let Some(latest_block) =
209 self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)? &&
210 latest_block.header().number() == block_number
211 {
212 assert_eq!(latest_block.header().hash_slow(), block_hash);
215 break
216 }
217 }
218 Ok(())
219 }
220
221 pub fn block_hash(&self, number: u64) -> BlockHash {
223 self.inner
224 .provider
225 .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(number))
226 .unwrap()
227 .unwrap()
228 .hash()
229 }
230
231 pub async fn sync_to(&self, block: BlockHash) -> eyre::Result<()> {
233 let start = std::time::Instant::now();
234
235 while self
236 .inner
237 .provider
238 .sealed_header_by_id(BlockId::Number(BlockNumberOrTag::Latest))?
239 .is_none_or(|h| h.hash() != block)
240 {
241 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
242 self.update_forkchoice(block, block).await?;
243
244 assert!(start.elapsed() <= std::time::Duration::from_secs(40), "timed out");
245 }
246
247 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
251
252 Ok(())
253 }
254
255 pub async fn update_forkchoice(&self, current_head: B256, new_head: B256) -> eyre::Result<()> {
257 self.inner
258 .add_ons_handle
259 .beacon_engine_handle
260 .fork_choice_updated(
261 ForkchoiceState {
262 head_block_hash: new_head,
263 safe_block_hash: current_head,
264 finalized_block_hash: current_head,
265 },
266 None,
267 EngineApiMessageVersion::default(),
268 )
269 .await?;
270
271 Ok(())
272 }
273
274 pub async fn update_optimistic_forkchoice(&self, hash: B256) -> eyre::Result<()> {
276 self.update_forkchoice(B256::ZERO, hash).await
277 }
278
279 pub async fn submit_payload(&self, payload: Payload::BuiltPayload) -> eyre::Result<B256> {
281 let block_hash = payload.block().hash();
282 self.inner
283 .add_ons_handle
284 .beacon_engine_handle
285 .new_payload(Payload::block_to_payload(payload.block().clone()))
286 .await?;
287
288 Ok(block_hash)
289 }
290
291 pub fn rpc_url(&self) -> Url {
293 let addr = self.inner.rpc_server_handle().http_local_addr().unwrap();
294 format!("http://{addr}").parse().unwrap()
295 }
296
297 pub fn rpc_client(&self) -> Option<HttpClient> {
299 self.inner.rpc_server_handle().http_client()
300 }
301
302 pub fn auth_server_handle(&self) -> AuthServerHandle {
304 self.inner.auth_server_handle().clone()
305 }
306
307 pub fn to_node_client(&self) -> eyre::Result<crate::testsuite::NodeClient<Payload>> {
313 let rpc = self
314 .rpc_client()
315 .ok_or_else(|| eyre::eyre!("Failed to create HTTP RPC client for node"))?;
316 let auth = self.auth_server_handle();
317 let url = self.rpc_url();
318 let beacon_handle = self.inner.add_ons_handle.beacon_engine_handle.clone();
319
320 Ok(crate::testsuite::NodeClient::new_with_beacon_engine(rpc, auth, url, beacon_handle))
321 }
322}