reth_bench/bench/
new_payload_fcu.rs1use crate::{
5 bench::{
6 context::BenchContext,
7 output::{
8 CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX,
9 GAS_OUTPUT_SUFFIX,
10 },
11 },
12 valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
13};
14use alloy_provider::Provider;
15use alloy_rpc_types_engine::ForkchoiceState;
16use clap::Parser;
17use csv::Writer;
18use eyre::{Context, OptionExt};
19use humantime::parse_duration;
20use reth_cli_runner::CliContext;
21use reth_node_core::args::BenchmarkArgs;
22use std::time::{Duration, Instant};
23use tracing::{debug, info};
24
25#[derive(Debug, Parser)]
27pub struct Command {
28 #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
30 rpc_url: String,
31
32 #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
34 wait_time: Option<Duration>,
35
36 #[command(flatten)]
37 benchmark: BenchmarkArgs,
38}
39
40impl Command {
41 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
43 let BenchContext {
44 benchmark_mode,
45 block_provider,
46 auth_provider,
47 mut next_block,
48 is_optimism,
49 } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
50
51 let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
52 tokio::task::spawn(async move {
53 while benchmark_mode.contains(next_block) {
54 let block_res = block_provider
55 .get_block_by_number(next_block.into())
56 .full()
57 .await
58 .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
59 let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
60 Ok(block) => block,
61 Err(e) => {
62 tracing::error!("Failed to fetch block {next_block}: {e}");
63 break;
64 }
65 };
66 let header = block.header.clone();
67
68 let (version, params) = match block_to_new_payload(block, is_optimism) {
69 Ok(result) => result,
70 Err(e) => {
71 tracing::error!("Failed to convert block to new payload: {e}");
72 break;
73 }
74 };
75 let head_block_hash = header.hash;
76 let safe_block_hash =
77 block_provider.get_block_by_number(header.number.saturating_sub(32).into());
78
79 let finalized_block_hash =
80 block_provider.get_block_by_number(header.number.saturating_sub(64).into());
81
82 let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
83
84 let safe_block_hash = match safe {
85 Ok(Some(block)) => block.header.hash,
86 Ok(None) | Err(_) => head_block_hash,
87 };
88
89 let finalized_block_hash = match finalized {
90 Ok(Some(block)) => block.header.hash,
91 Ok(None) | Err(_) => head_block_hash,
92 };
93
94 next_block += 1;
95 if let Err(e) = sender
96 .send((
97 header,
98 version,
99 params,
100 head_block_hash,
101 safe_block_hash,
102 finalized_block_hash,
103 ))
104 .await
105 {
106 tracing::error!("Failed to send block data: {e}");
107 break;
108 }
109 }
110 });
111
112 let mut results = Vec::new();
114 let total_benchmark_duration = Instant::now();
115 let mut total_wait_time = Duration::ZERO;
116
117 while let Some((header, version, params, head, safe, finalized)) = {
118 let wait_start = Instant::now();
119 let result = receiver.recv().await;
120 total_wait_time += wait_start.elapsed();
121 result
122 } {
123 let gas_used = header.gas_used;
125 let block_number = header.number;
126
127 debug!(target: "reth-bench", ?block_number, "Sending payload",);
128
129 let forkchoice_state = ForkchoiceState {
131 head_block_hash: head,
132 safe_block_hash: safe,
133 finalized_block_hash: finalized,
134 };
135
136 let start = Instant::now();
137 call_new_payload(&auth_provider, version, params).await?;
138
139 let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
140
141 call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?;
142
143 let total_latency = start.elapsed();
145 let fcu_latency = total_latency - new_payload_result.latency;
146 let combined_result =
147 CombinedResult { block_number, new_payload_result, fcu_latency, total_latency };
148
149 let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
152
153 info!(%combined_result);
155
156 if let Some(wait_time) = self.wait_time {
158 tokio::time::sleep(wait_time).await;
159 }
160
161 let gas_row = TotalGasRow { block_number, gas_used, time: current_duration };
163 results.push((gas_row, combined_result));
164 }
165
166 let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
167 results.into_iter().unzip();
168
169 if let Some(path) = self.benchmark.output {
171 let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
173 info!("Writing engine api call latency output to file: {:?}", output_path);
174 let mut writer = Writer::from_path(output_path)?;
175 for result in combined_results {
176 writer.serialize(result)?;
177 }
178 writer.flush()?;
179
180 let output_path = path.join(GAS_OUTPUT_SUFFIX);
182 info!("Writing total gas output to file: {:?}", output_path);
183 let mut writer = Writer::from_path(output_path)?;
184 for row in &gas_output_results {
185 writer.serialize(row)?;
186 }
187 writer.flush()?;
188
189 info!("Finished writing benchmark output files to {:?}.", path);
190 }
191
192 let gas_output = TotalGasOutput::new(gas_output_results)?;
194 info!(
195 total_duration=?gas_output.total_duration,
196 total_gas_used=?gas_output.total_gas_used,
197 blocks_processed=?gas_output.blocks_processed,
198 "Total Ggas/s: {:.4}",
199 gas_output.total_gigagas_per_second()
200 );
201
202 Ok(())
203 }
204}