reth_e2e_test_utils/
payload.rs1use futures_util::StreamExt;
2use reth_node_api::{BlockBody, PayloadAttributes, PayloadKind};
3use reth_payload_builder::{PayloadBuilderHandle, PayloadId};
4use reth_payload_builder_primitives::Events;
5use reth_payload_primitives::{BuiltPayload, PayloadTypes};
6use tokio_stream::wrappers::BroadcastStream;
7
8#[derive(derive_more::Debug)]
10pub struct PayloadTestContext<T: PayloadTypes> {
11 pub payload_event_stream: BroadcastStream<Events<T>>,
12 payload_builder: PayloadBuilderHandle<T>,
13 pub timestamp: u64,
14 #[debug(skip)]
15 attributes_generator: Box<dyn Fn(u64) -> T::PayloadAttributes + Send + Sync>,
16}
17
18impl<T: PayloadTypes> PayloadTestContext<T> {
19 pub async fn new(
21 payload_builder: PayloadBuilderHandle<T>,
22 attributes_generator: impl Fn(u64) -> T::PayloadAttributes + Send + Sync + 'static,
23 ) -> eyre::Result<Self> {
24 let payload_events = payload_builder.subscribe().await?;
25 let payload_event_stream = payload_events.into_stream();
26 Ok(Self {
28 payload_event_stream,
29 payload_builder,
30 timestamp: 1710338135,
31 attributes_generator: Box::new(attributes_generator),
32 })
33 }
34
35 pub fn next_attributes(&mut self) -> T::PayloadAttributes {
37 self.timestamp += 1;
38 (self.attributes_generator)(self.timestamp)
39 }
40
41 pub async fn expect_attr_event(&mut self, attrs: T::PayloadAttributes) -> eyre::Result<()> {
43 let first_event = self.payload_event_stream.next().await.unwrap()?;
44 if let Events::Attributes(attr) = first_event {
45 assert_eq!(attrs.timestamp(), attr.timestamp());
46 } else {
47 panic!("Expect first event as payload attributes.")
48 }
49 Ok(())
50 }
51
52 pub async fn wait_for_built_payload(&self, payload_id: PayloadId) {
56 let start = std::time::Instant::now();
57 loop {
58 let payload =
59 self.payload_builder.best_payload(payload_id).await.transpose().ok().flatten();
60 if payload.is_none_or(|p| p.block().body().transactions().is_empty()) {
61 assert!(
62 start.elapsed() < std::time::Duration::from_secs(30),
63 "timed out waiting for a non-empty payload for {payload_id} — \
64 check that the chain spec supports all generated tx types"
65 );
66 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
67 continue
68 }
69 self.payload_builder
71 .resolve_kind(payload_id, PayloadKind::Earliest)
72 .await
73 .unwrap()
74 .unwrap();
75 break;
76 }
77 }
78
79 pub async fn expect_built_payload(&mut self) -> eyre::Result<T::BuiltPayload> {
81 let second_event = self.payload_event_stream.next().await.unwrap()?;
82 if let Events::BuiltPayload(payload) = second_event {
83 Ok(payload)
84 } else {
85 panic!("Expect a built payload event.");
86 }
87 }
88}