reth_bench/bench/
new_payload_fcu.rs
1use crate::{
5 bench::{
6 context::BenchContext,
7 output::{
8 CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX,
9 GAS_OUTPUT_SUFFIX,
10 },
11 },
12 valid_payload::{call_forkchoice_updated, call_new_payload},
13};
14use alloy_provider::Provider;
15use alloy_rpc_types_engine::{ExecutionPayload, ForkchoiceState};
16use clap::Parser;
17use csv::Writer;
18use humantime::parse_duration;
19use reth_cli_runner::CliContext;
20use reth_node_core::args::BenchmarkArgs;
21use std::time::{Duration, Instant};
22use tracing::{debug, info};
23
24#[derive(Debug, Parser)]
26pub struct Command {
27 #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
29 rpc_url: String,
30
31 #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
33 wait_time: Option<Duration>,
34
35 #[command(flatten)]
36 benchmark: BenchmarkArgs,
37}
38
39impl Command {
40 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
42 let BenchContext { benchmark_mode, block_provider, auth_provider, mut next_block } =
43 BenchContext::new(&self.benchmark, self.rpc_url).await?;
44
45 let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
46 tokio::task::spawn(async move {
47 while benchmark_mode.contains(next_block) {
48 let block_res = block_provider.get_block_by_number(next_block.into()).full().await;
49 let block = block_res.unwrap().unwrap();
50
51 let block = block
52 .into_inner()
53 .map_header(|header| header.map(|h| h.into_header_with_defaults()))
54 .try_map_transactions(|tx| {
55 tx.try_into_either::<op_alloy_consensus::OpTxEnvelope>()
57 })
58 .unwrap()
59 .into_consensus();
60
61 let blob_versioned_hashes =
62 block.body.blob_versioned_hashes_iter().copied().collect::<Vec<_>>();
63
64 let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
66 let header = block.header;
67 let head_block_hash = payload.block_hash();
68 let safe_block_hash =
69 block_provider.get_block_by_number(header.number.saturating_sub(32).into());
70
71 let finalized_block_hash =
72 block_provider.get_block_by_number(header.number.saturating_sub(64).into());
73
74 let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
75
76 let safe_block_hash = safe.unwrap().expect("finalized block exists").header.hash;
77 let finalized_block_hash =
78 finalized.unwrap().expect("finalized block exists").header.hash;
79
80 next_block += 1;
81 sender
82 .send((
83 header,
84 blob_versioned_hashes,
85 payload,
86 sidecar,
87 head_block_hash,
88 safe_block_hash,
89 finalized_block_hash,
90 ))
91 .await
92 .unwrap();
93 }
94 });
95
96 let mut results = Vec::new();
98 let total_benchmark_duration = Instant::now();
99 let mut total_wait_time = Duration::ZERO;
100
101 while let Some((header, versioned_hashes, payload, sidecar, head, safe, finalized)) = {
102 let wait_start = Instant::now();
103 let result = receiver.recv().await;
104 total_wait_time += wait_start.elapsed();
105 result
106 } {
107 let gas_used = header.gas_used;
109 let block_number = header.number;
110
111 debug!(target: "reth-bench", ?block_number, "Sending payload",);
112
113 let forkchoice_state = ForkchoiceState {
115 head_block_hash: head,
116 safe_block_hash: safe,
117 finalized_block_hash: finalized,
118 };
119
120 let start = Instant::now();
121 let message_version = call_new_payload(
122 &auth_provider,
123 payload,
124 sidecar,
125 header.parent_beacon_block_root,
126 versioned_hashes,
127 )
128 .await?;
129
130 let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
131
132 call_forkchoice_updated(&auth_provider, message_version, forkchoice_state, None)
133 .await?;
134
135 let total_latency = start.elapsed();
137 let fcu_latency = total_latency - new_payload_result.latency;
138 let combined_result =
139 CombinedResult { block_number, new_payload_result, fcu_latency, total_latency };
140
141 let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
144
145 info!(%combined_result);
147
148 if let Some(wait_time) = self.wait_time {
150 tokio::time::sleep(wait_time).await;
151 }
152
153 let gas_row = TotalGasRow { block_number, gas_used, time: current_duration };
155 results.push((gas_row, combined_result));
156 }
157
158 let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
159 results.into_iter().unzip();
160
161 if let Some(path) = self.benchmark.output {
163 let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
165 info!("Writing engine api call latency output to file: {:?}", output_path);
166 let mut writer = Writer::from_path(output_path)?;
167 for result in combined_results {
168 writer.serialize(result)?;
169 }
170 writer.flush()?;
171
172 let output_path = path.join(GAS_OUTPUT_SUFFIX);
174 info!("Writing total gas output to file: {:?}", output_path);
175 let mut writer = Writer::from_path(output_path)?;
176 for row in &gas_output_results {
177 writer.serialize(row)?;
178 }
179 writer.flush()?;
180
181 info!("Finished writing benchmark output files to {:?}.", path);
182 }
183
184 let gas_output = TotalGasOutput::new(gas_output_results);
186 info!(
187 total_duration=?gas_output.total_duration,
188 total_gas_used=?gas_output.total_gas_used,
189 blocks_processed=?gas_output.blocks_processed,
190 "Total Ggas/s: {:.4}",
191 gas_output.total_gigagas_per_second()
192 );
193
194 Ok(())
195 }
196}