Skip to main content

reth_e2e_test_utils/
node.rs

1use crate::{network::NetworkTestContext, payload::PayloadTestContext, rpc::RpcTestContext};
2use alloy_consensus::{transaction::TxHashRef, BlockHeader};
3use alloy_eips::BlockId;
4use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealable, B256};
5use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV5, ForkchoiceState};
6use alloy_rpc_types_eth::BlockNumberOrTag;
7use eyre::Ok;
8use futures_util::Future;
9use jsonrpsee::{core::client::ClientT, http_client::HttpClient};
10use reth_chainspec::EthereumHardforks;
11use reth_network_api::test_utils::PeersHandleProvider;
12use reth_node_api::{Block, BlockBody, BlockTy, FullNodeComponents, PayloadTypes, PrimitivesTy};
13use reth_node_builder::{rpc::RethRpcAddOns, FullNode, NodeTypes};
14
15use reth_payload_primitives::BuiltPayload;
16use reth_provider::{
17    BlockReader, BlockReaderIdExt, CanonStateNotificationStream, CanonStateSubscriptions,
18    HeaderProvider, StageCheckpointReader,
19};
20use reth_rpc_api::TestingBuildBlockRequestV1;
21use reth_rpc_builder::auth::AuthServerHandle;
22use reth_rpc_eth_api::helpers::{EthApiSpec, EthTransactions, TraceExt};
23use reth_stages_types::StageId;
24use std::pin::Pin;
25use tokio_stream::StreamExt;
26use url::Url;
27
28/// A helper struct to handle node actions
29#[expect(missing_debug_implementations)]
30pub struct NodeTestContext<Node, AddOns>
31where
32    Node: FullNodeComponents,
33    AddOns: RethRpcAddOns<Node>,
34{
35    /// The core structure representing the full node.
36    pub inner: FullNode<Node, AddOns>,
37    /// Context for testing payload-related features.
38    pub payload: PayloadTestContext<<Node::Types as NodeTypes>::Payload>,
39    /// Context for testing network functionalities.
40    pub network: NetworkTestContext<Node::Network>,
41    /// Context for testing RPC features.
42    pub rpc: RpcTestContext<Node, AddOns::EthApi>,
43    /// Canonical state events.
44    pub canonical_stream: CanonStateNotificationStream<PrimitivesTy<Node::Types>>,
45}
46
47impl<Node, Payload, AddOns> NodeTestContext<Node, AddOns>
48where
49    Payload: PayloadTypes,
50    Node: FullNodeComponents,
51    Node::Types: NodeTypes<ChainSpec: EthereumHardforks, Payload = Payload>,
52    Node::Network: PeersHandleProvider,
53    AddOns: RethRpcAddOns<Node>,
54{
55    /// Creates a new test node
56    pub async fn new(
57        node: FullNode<Node, AddOns>,
58        attributes_generator: impl Fn(u64) -> Payload::PayloadAttributes + Send + Sync + 'static,
59    ) -> eyre::Result<Self> {
60        Ok(Self {
61            inner: node.clone(),
62            payload: PayloadTestContext::new(
63                node.payload_builder_handle.clone(),
64                attributes_generator,
65            )
66            .await?,
67            network: NetworkTestContext::new(node.network.clone()),
68            rpc: RpcTestContext { inner: node.add_ons_handle.rpc_registry },
69            canonical_stream: node.provider.canonical_state_stream(),
70        })
71    }
72
73    /// Establish a connection to the node
74    pub async fn connect(&mut self, node: &mut Self) {
75        self.network.add_peer(node.network.record()).await;
76        node.network.next_session_established().await;
77        self.network.next_session_established().await;
78    }
79
80    /// Advances the chain `length` blocks.
81    ///
82    /// Returns the added chain as a Vec of block hashes.
83    pub async fn advance(
84        &mut self,
85        length: u64,
86        tx_generator: impl Fn(u64) -> Pin<Box<dyn Future<Output = Bytes>>>,
87    ) -> eyre::Result<Vec<Payload::BuiltPayload>>
88    where
89        AddOns::EthApi: EthApiSpec<Provider: BlockReader<Block = BlockTy<Node::Types>>>
90            + EthTransactions
91            + TraceExt,
92    {
93        let mut chain = Vec::with_capacity(length as usize);
94        for i in 0..length {
95            let raw_tx = tx_generator(i).await;
96            let tx_hash = self.rpc.inject_tx(raw_tx).await?;
97            let payload = self.advance_block().await?;
98            let block_hash = payload.block().hash();
99            let block_number = payload.block().number();
100            self.assert_new_block(tx_hash, block_hash, block_number).await?;
101            chain.push(payload);
102        }
103        Ok(chain)
104    }
105
106    /// Returns the current forkchoice state of the node.
107    pub fn current_forkchoice_state(&self) -> eyre::Result<ForkchoiceState> {
108        let latest_header =
109            self.inner.provider.sealed_header_by_number_or_tag(BlockNumberOrTag::Latest)?.unwrap();
110
111        if latest_header.number() == 0 {
112            return Ok(ForkchoiceState::same_hash(latest_header.hash()));
113        }
114
115        Ok(ForkchoiceState {
116            head_block_hash: latest_header.hash(),
117            safe_block_hash: self
118                .inner
119                .provider
120                .sealed_header_by_number_or_tag(BlockNumberOrTag::Safe)?
121                .unwrap()
122                .hash(),
123            finalized_block_hash: self
124                .inner
125                .provider
126                .sealed_header_by_number_or_tag(BlockNumberOrTag::Finalized)?
127                .unwrap()
128                .hash(),
129        })
130    }
131
132    /// Creates a new payload from given attributes generator
133    /// expects a payload attribute event and waits until the payload is built.
134    ///
135    /// It triggers the resolve payload via engine api and expects the built payload event.
136    pub async fn new_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
137        let eth_attr = self.payload.next_attributes();
138        let payload_id = self
139            .inner
140            .add_ons_handle
141            .beacon_engine_handle
142            .fork_choice_updated(self.current_forkchoice_state()?, Some(eth_attr.clone()))
143            .await?
144            .payload_id
145            .unwrap();
146        // first event is the payload attributes
147        self.payload.expect_attr_event(eth_attr).await?;
148        // wait for the payload builder to have finished building
149        self.payload.wait_for_built_payload(payload_id).await;
150        // ensure we're also receiving the built payload as event
151        Ok(self.payload.expect_built_payload().await?)
152    }
153
154    /// Triggers payload building job and submits it to the engine.
155    pub async fn build_and_submit_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
156        let payload = self.new_payload().await?;
157
158        self.submit_payload(payload.clone()).await?;
159
160        Ok(payload)
161    }
162
163    /// Advances the node forward one block
164    pub async fn advance_block(&mut self) -> eyre::Result<Payload::BuiltPayload> {
165        let payload = self.build_and_submit_payload().await?;
166
167        // trigger forkchoice update via engine api to commit the block to the blockchain
168        self.update_forkchoice(payload.block().hash(), payload.block().hash()).await?;
169
170        Ok(payload)
171    }
172
173    /// Waits for block to be available on node.
174    pub async fn wait_block(
175        &self,
176        number: BlockNumber,
177        expected_block_hash: BlockHash,
178        wait_finish_checkpoint: bool,
179    ) -> eyre::Result<()> {
180        let mut check = !wait_finish_checkpoint;
181        loop {
182            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
183
184            if !check &&
185                wait_finish_checkpoint &&
186                let Some(checkpoint) =
187                    self.inner.provider.get_stage_checkpoint(StageId::Finish)? &&
188                checkpoint.block_number >= number
189            {
190                check = true
191            }
192
193            if check {
194                if let Some(latest_header) = self.inner.provider.header_by_number(number)? {
195                    assert_eq!(latest_header.hash_slow(), expected_block_hash);
196                    break
197                }
198                assert!(
199                    !wait_finish_checkpoint,
200                    "Finish checkpoint matches, but could not fetch block."
201                );
202            }
203        }
204        Ok(())
205    }
206
207    /// Waits for the node to unwind to the given block number
208    pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> {
209        loop {
210            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
211            if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? &&
212                checkpoint.block_number == number
213            {
214                break
215            }
216        }
217        Ok(())
218    }
219
220    /// Asserts that a new block has been added to the blockchain
221    /// and the tx has been included in the block.
222    ///
223    /// Does NOT work for pipeline since there's no stream notification!
224    pub async fn assert_new_block(
225        &mut self,
226        tip_tx_hash: B256,
227        block_hash: B256,
228        block_number: BlockNumber,
229    ) -> eyre::Result<()> {
230        // get head block from notifications stream and verify the tx has been pushed to the
231        // pool is actually present in the canonical block
232        let head = self.canonical_stream.next().await.unwrap();
233        let tx = head.tip().body().transactions().first();
234        assert_eq!(tx.unwrap().tx_hash().as_slice(), tip_tx_hash.as_slice());
235
236        loop {
237            // wait for the block to commit
238            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
239            if let Some(latest_block) =
240                self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)? &&
241                latest_block.header().number() == block_number
242            {
243                // make sure the block hash we submitted via FCU engine api is the new latest
244                // block using an RPC call
245                assert_eq!(latest_block.header().hash_slow(), block_hash);
246                break
247            }
248        }
249        Ok(())
250    }
251
252    /// Gets block hash by number.
253    pub fn block_hash(&self, number: u64) -> BlockHash {
254        self.inner
255            .provider
256            .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(number))
257            .unwrap()
258            .unwrap()
259            .hash()
260    }
261
262    /// Sends FCU and waits for the node to sync to the given block.
263    pub async fn sync_to(&self, block: BlockHash) -> eyre::Result<()> {
264        let start = std::time::Instant::now();
265
266        while self
267            .inner
268            .provider
269            .sealed_header_by_id(BlockId::Number(BlockNumberOrTag::Latest))?
270            .is_none_or(|h| h.hash() != block)
271        {
272            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
273            self.update_forkchoice(block, block).await?;
274
275            assert!(start.elapsed() <= std::time::Duration::from_secs(40), "timed out");
276        }
277
278        // Hack to make sure that all components have time to process canonical state update.
279        // Otherwise, this might result in e.g "nonce too low" errors when advancing chain further,
280        // making tests flaky.
281        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
282
283        Ok(())
284    }
285
286    /// Sends a forkchoice update message to the engine.
287    pub async fn update_forkchoice(&self, current_head: B256, new_head: B256) -> eyre::Result<()> {
288        self.inner
289            .add_ons_handle
290            .beacon_engine_handle
291            .fork_choice_updated(
292                ForkchoiceState {
293                    head_block_hash: new_head,
294                    safe_block_hash: current_head,
295                    finalized_block_hash: current_head,
296                },
297                None,
298            )
299            .await?;
300
301        Ok(())
302    }
303
304    /// Sends forkchoice update to the engine api with a zero finalized hash
305    pub async fn update_optimistic_forkchoice(&self, hash: B256) -> eyre::Result<()> {
306        self.update_forkchoice(B256::ZERO, hash).await
307    }
308
309    /// Submits a payload to the engine.
310    pub async fn submit_payload(&self, payload: Payload::BuiltPayload) -> eyre::Result<B256> {
311        let block_hash = payload.block().hash();
312        self.inner
313            .add_ons_handle
314            .beacon_engine_handle
315            .new_payload(Payload::block_to_payload(payload.block().clone()))
316            .await?;
317
318        Ok(block_hash)
319    }
320
321    /// Returns the RPC URL.
322    pub fn rpc_url(&self) -> Url {
323        let addr = self.inner.rpc_server_handle().http_local_addr().unwrap();
324        format!("http://{addr}").parse().unwrap()
325    }
326
327    /// Returns an RPC client.
328    pub fn rpc_client(&self) -> Option<HttpClient> {
329        self.inner.rpc_server_handle().http_client()
330    }
331
332    /// Returns an Engine API client.
333    pub fn auth_server_handle(&self) -> AuthServerHandle {
334        self.inner.auth_server_handle().clone()
335    }
336
337    /// Creates a [`crate::testsuite::NodeClient`] from this test context.
338    ///
339    /// This helper method extracts the necessary handles and creates a client
340    /// that can interact with both the regular RPC and Engine API endpoints.
341    /// It automatically includes the beacon engine handle for direct consensus engine interaction.
342    pub fn to_node_client(&self) -> eyre::Result<crate::testsuite::NodeClient<Payload>> {
343        let rpc = self
344            .rpc_client()
345            .ok_or_else(|| eyre::eyre!("Failed to create HTTP RPC client for node"))?;
346        let auth = self.auth_server_handle();
347        let url = self.rpc_url();
348        let beacon_handle = self.inner.add_ons_handle.beacon_engine_handle.clone();
349
350        Ok(crate::testsuite::NodeClient::new_with_beacon_engine(rpc, auth, url, beacon_handle))
351    }
352
353    /// Calls the `testing_buildBlockV1` RPC on this node.
354    ///
355    /// This endpoint builds a block using the provided parent, payload attributes, and
356    /// transactions. Requires the `Testing` RPC module to be enabled.
357    pub async fn testing_build_block_v1(
358        &self,
359        request: TestingBuildBlockRequestV1,
360    ) -> eyre::Result<ExecutionPayloadEnvelopeV5> {
361        let client =
362            self.rpc_client().ok_or_else(|| eyre::eyre!("HTTP RPC client not available"))?;
363
364        let res: ExecutionPayloadEnvelopeV5 =
365            client.request("testing_buildBlockV1", [request]).await?;
366        eyre::Ok(res)
367    }
368}