use crate::{
engine_api::EngineApiTestContext, network::NetworkTestContext, payload::PayloadTestContext,
rpc::RpcTestContext, traits::PayloadEnvelopeExt,
};
use alloy_consensus::BlockHeader;
use alloy_eips::BlockId;
use alloy_primitives::{BlockHash, BlockNumber, Bytes, B256};
use alloy_rpc_types_engine::PayloadStatusEnum;
use alloy_rpc_types_eth::BlockNumberOrTag;
use eyre::Ok;
use futures_util::Future;
use reth_chainspec::EthereumHardforks;
use reth_network_api::test_utils::PeersHandleProvider;
use reth_node_api::{Block, EngineTypes, FullNodeComponents};
use reth_node_builder::{rpc::RethRpcAddOns, FullNode, NodeTypes, NodeTypesWithEngine};
use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes};
use reth_primitives::EthPrimitives;
use reth_provider::{
BlockReader, BlockReaderIdExt, CanonStateSubscriptions, StageCheckpointReader,
};
use reth_rpc_eth_api::helpers::{EthApiSpec, EthTransactions, TraceExt};
use reth_stages_types::StageId;
use std::{marker::PhantomData, pin::Pin};
use tokio_stream::StreamExt;
use url::Url;
#[allow(missing_debug_implementations)]
pub struct NodeTestContext<Node, AddOns>
where
Node: FullNodeComponents,
AddOns: RethRpcAddOns<Node>,
{
pub inner: FullNode<Node, AddOns>,
pub payload: PayloadTestContext<<Node::Types as NodeTypesWithEngine>::Engine>,
pub network: NetworkTestContext<Node::Network>,
pub engine_api: EngineApiTestContext<
<Node::Types as NodeTypesWithEngine>::Engine,
<Node::Types as NodeTypes>::ChainSpec,
>,
pub rpc: RpcTestContext<Node, AddOns::EthApi>,
}
impl<Node, Engine, AddOns> NodeTestContext<Node, AddOns>
where
Engine: EngineTypes,
Node: FullNodeComponents,
Node::Types: NodeTypesWithEngine<
ChainSpec: EthereumHardforks,
Engine = Engine,
Primitives = EthPrimitives,
>,
Node::Network: PeersHandleProvider,
AddOns: RethRpcAddOns<Node>,
{
pub async fn new(
node: FullNode<Node, AddOns>,
attributes_generator: impl Fn(u64) -> Engine::PayloadBuilderAttributes + 'static,
) -> eyre::Result<Self> {
let builder = node.payload_builder.clone();
Ok(Self {
inner: node.clone(),
payload: PayloadTestContext::new(builder, attributes_generator).await?,
network: NetworkTestContext::new(node.network.clone()),
engine_api: EngineApiTestContext {
chain_spec: node.chain_spec(),
engine_api_client: node.auth_server_handle().http_client(),
canonical_stream: node.provider.canonical_state_stream(),
_marker: PhantomData::<Engine>,
},
rpc: RpcTestContext { inner: node.add_ons_handle.rpc_registry },
})
}
pub async fn connect(&mut self, node: &mut Self) {
self.network.add_peer(node.network.record()).await;
node.network.next_session_established().await;
self.network.next_session_established().await;
}
pub async fn advance(
&mut self,
length: u64,
tx_generator: impl Fn(u64) -> Pin<Box<dyn Future<Output = Bytes>>>,
) -> eyre::Result<Vec<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>>
where
Engine::ExecutionPayloadEnvelopeV3: From<Engine::BuiltPayload> + PayloadEnvelopeExt,
Engine::ExecutionPayloadEnvelopeV4: From<Engine::BuiltPayload> + PayloadEnvelopeExt,
AddOns::EthApi: EthApiSpec<Provider: BlockReader<Block = reth_primitives::Block>>
+ EthTransactions
+ TraceExt,
{
let mut chain = Vec::with_capacity(length as usize);
for i in 0..length {
let raw_tx = tx_generator(i).await;
let tx_hash = self.rpc.inject_tx(raw_tx).await?;
let (payload, eth_attr) = self.advance_block().await?;
let block_hash = payload.block().hash();
let block_number = payload.block().number;
self.assert_new_block(tx_hash, block_hash, block_number).await?;
chain.push((payload, eth_attr));
}
Ok(chain)
}
pub async fn new_payload(
&mut self,
) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>
where
<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3:
From<Engine::BuiltPayload> + PayloadEnvelopeExt,
{
let eth_attr = self.payload.new_payload().await.unwrap();
self.payload.expect_attr_event(eth_attr.clone()).await?;
self.payload.wait_for_built_payload(eth_attr.payload_id()).await;
self.engine_api.get_payload_v3_value(eth_attr.payload_id()).await?;
Ok((self.payload.expect_built_payload().await?, eth_attr))
}
pub async fn build_and_submit_payload(
&mut self,
) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>
where
<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3:
From<Engine::BuiltPayload> + PayloadEnvelopeExt,
<Engine as EngineTypes>::ExecutionPayloadEnvelopeV4:
From<Engine::BuiltPayload> + PayloadEnvelopeExt,
{
let (payload, eth_attr) = self.new_payload().await?;
self.engine_api
.submit_payload(payload.clone(), eth_attr.clone(), PayloadStatusEnum::Valid)
.await?;
Ok((payload, eth_attr))
}
pub async fn advance_block(
&mut self,
) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>
where
<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3:
From<Engine::BuiltPayload> + PayloadEnvelopeExt,
<Engine as EngineTypes>::ExecutionPayloadEnvelopeV4:
From<Engine::BuiltPayload> + PayloadEnvelopeExt,
{
let (payload, eth_attr) = self.build_and_submit_payload().await?;
self.engine_api.update_forkchoice(payload.block().hash(), payload.block().hash()).await?;
Ok((payload, eth_attr))
}
pub async fn wait_block(
&self,
number: BlockNumber,
expected_block_hash: BlockHash,
wait_finish_checkpoint: bool,
) -> eyre::Result<()> {
let mut check = !wait_finish_checkpoint;
loop {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
if !check && wait_finish_checkpoint {
if let Some(checkpoint) =
self.inner.provider.get_stage_checkpoint(StageId::Finish)?
{
if checkpoint.block_number >= number {
check = true
}
}
}
if check {
if let Some(latest_block) = self.inner.provider.block_by_number(number)? {
assert_eq!(latest_block.header().hash_slow(), expected_block_hash);
break
}
assert!(
!wait_finish_checkpoint,
"Finish checkpoint matches, but could not fetch block."
);
}
}
Ok(())
}
pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> {
loop {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? {
if checkpoint.block_number == number {
break
}
}
}
Ok(())
}
pub async fn assert_new_block(
&mut self,
tip_tx_hash: B256,
block_hash: B256,
block_number: BlockNumber,
) -> eyre::Result<()> {
let head = self.engine_api.canonical_stream.next().await.unwrap();
let tx = head.tip().transactions().first();
assert_eq!(tx.unwrap().hash().as_slice(), tip_tx_hash.as_slice());
loop {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
if let Some(latest_block) =
self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)?
{
if latest_block.header().number() == block_number {
assert_eq!(latest_block.header().hash_slow(), block_hash);
break
}
}
}
Ok(())
}
pub fn block_hash(&self, number: u64) -> BlockHash {
self.inner
.provider
.sealed_header_by_number_or_tag(BlockNumberOrTag::Number(number))
.unwrap()
.unwrap()
.hash()
}
pub async fn sync_to(&self, block: BlockHash) -> eyre::Result<()> {
self.engine_api.update_forkchoice(block, block).await?;
let start = std::time::Instant::now();
while self
.inner
.provider
.sealed_header_by_id(BlockId::Number(BlockNumberOrTag::Latest))?
.is_none_or(|h| h.hash() != block)
{
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert!(start.elapsed() <= std::time::Duration::from_secs(10), "timed out");
}
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
Ok(())
}
pub fn rpc_url(&self) -> Url {
let addr = self.inner.rpc_server_handle().http_local_addr().unwrap();
format!("http://{}", addr).parse().unwrap()
}
}