reth_bench/bench/
new_payload_fcu.rs1use crate::{
5 bench::{
6 context::BenchContext,
7 output::{
8 CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX,
9 GAS_OUTPUT_SUFFIX,
10 },
11 },
12 valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
13};
14use alloy_provider::Provider;
15use alloy_rpc_types_engine::ForkchoiceState;
16use clap::Parser;
17use csv::Writer;
18use eyre::Context;
19use humantime::parse_duration;
20use reth_cli_runner::CliContext;
21use reth_node_core::args::BenchmarkArgs;
22use std::time::{Duration, Instant};
23use tracing::{debug, info};
24
25#[derive(Debug, Parser)]
27pub struct Command {
28 #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
30 rpc_url: String,
31
32 #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
34 wait_time: Option<Duration>,
35
36 #[command(flatten)]
37 benchmark: BenchmarkArgs,
38}
39
40impl Command {
41 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
43 let BenchContext {
44 benchmark_mode,
45 block_provider,
46 auth_provider,
47 mut next_block,
48 is_optimism,
49 } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
50
51 let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
52 tokio::task::spawn(async move {
53 while benchmark_mode.contains(next_block) {
54 let block_res = block_provider
55 .get_block_by_number(next_block.into())
56 .full()
57 .await
58 .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
59 let block = block_res.unwrap().unwrap();
60 let header = block.header.clone();
61
62 let (version, params) = block_to_new_payload(block, is_optimism).unwrap();
63 let head_block_hash = header.hash;
64 let safe_block_hash =
65 block_provider.get_block_by_number(header.number.saturating_sub(32).into());
66
67 let finalized_block_hash =
68 block_provider.get_block_by_number(header.number.saturating_sub(64).into());
69
70 let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
71
72 let safe_block_hash = safe.unwrap().expect("finalized block exists").header.hash;
73 let finalized_block_hash =
74 finalized.unwrap().expect("finalized block exists").header.hash;
75
76 next_block += 1;
77 sender
78 .send((
79 header,
80 version,
81 params,
82 head_block_hash,
83 safe_block_hash,
84 finalized_block_hash,
85 ))
86 .await
87 .unwrap();
88 }
89 });
90
91 let mut results = Vec::new();
93 let total_benchmark_duration = Instant::now();
94 let mut total_wait_time = Duration::ZERO;
95
96 while let Some((header, version, params, head, safe, finalized)) = {
97 let wait_start = Instant::now();
98 let result = receiver.recv().await;
99 total_wait_time += wait_start.elapsed();
100 result
101 } {
102 let gas_used = header.gas_used;
104 let block_number = header.number;
105
106 debug!(target: "reth-bench", ?block_number, "Sending payload",);
107
108 let forkchoice_state = ForkchoiceState {
110 head_block_hash: head,
111 safe_block_hash: safe,
112 finalized_block_hash: finalized,
113 };
114
115 let start = Instant::now();
116 call_new_payload(&auth_provider, version, params).await?;
117
118 let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
119
120 call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?;
121
122 let total_latency = start.elapsed();
124 let fcu_latency = total_latency - new_payload_result.latency;
125 let combined_result =
126 CombinedResult { block_number, new_payload_result, fcu_latency, total_latency };
127
128 let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
131
132 info!(%combined_result);
134
135 if let Some(wait_time) = self.wait_time {
137 tokio::time::sleep(wait_time).await;
138 }
139
140 let gas_row = TotalGasRow { block_number, gas_used, time: current_duration };
142 results.push((gas_row, combined_result));
143 }
144
145 let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
146 results.into_iter().unzip();
147
148 if let Some(path) = self.benchmark.output {
150 let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
152 info!("Writing engine api call latency output to file: {:?}", output_path);
153 let mut writer = Writer::from_path(output_path)?;
154 for result in combined_results {
155 writer.serialize(result)?;
156 }
157 writer.flush()?;
158
159 let output_path = path.join(GAS_OUTPUT_SUFFIX);
161 info!("Writing total gas output to file: {:?}", output_path);
162 let mut writer = Writer::from_path(output_path)?;
163 for row in &gas_output_results {
164 writer.serialize(row)?;
165 }
166 writer.flush()?;
167
168 info!("Finished writing benchmark output files to {:?}.", path);
169 }
170
171 let gas_output = TotalGasOutput::new(gas_output_results);
173 info!(
174 total_duration=?gas_output.total_duration,
175 total_gas_used=?gas_output.total_gas_used,
176 blocks_processed=?gas_output.blocks_processed,
177 "Total Ggas/s: {:.4}",
178 gas_output.total_gigagas_per_second()
179 );
180
181 Ok(())
182 }
183}