reth_bench/bench/
new_payload_only.rs1use crate::{
4 bench::{
5 context::BenchContext,
6 helpers::fetch_block_access_list,
7 metrics_scraper::MetricsScraper,
8 output::{
9 NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX,
10 NEW_PAYLOAD_OUTPUT_SUFFIX,
11 },
12 },
13 valid_payload::{block_to_new_payload, call_new_payload_with_reth},
14};
15use alloy_provider::{ext::DebugApi, Provider};
16use clap::Parser;
17use csv::Writer;
18use eyre::{Context, OptionExt};
19use reth_cli_runner::CliContext;
20use reth_node_core::args::BenchmarkArgs;
21use std::time::{Duration, Instant};
22use tracing::{debug, info};
23
24#[derive(Debug, Parser)]
26pub struct Command {
27 #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
29 rpc_url: String,
30
31 #[arg(
34 long = "rpc-block-buffer-size",
35 value_name = "RPC_BLOCK_BUFFER_SIZE",
36 default_value = "20",
37 verbatim_doc_comment
38 )]
39 rpc_block_buffer_size: usize,
40
41 #[command(flatten)]
42 benchmark: BenchmarkArgs,
43}
44
45impl Command {
46 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
48 let BenchContext {
49 benchmark_mode,
50 block_provider,
51 auth_provider,
52 mut next_block,
53 use_reth_namespace,
54 rlp_blocks,
55 wait_for_persistence,
56 no_wait_for_caches,
57 ..
58 } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
59
60 let total_blocks = benchmark_mode.total_blocks();
61
62 let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
63
64 if use_reth_namespace {
65 info!("Using reth_newPayload endpoint");
66 }
67
68 let buffer_size = self.rpc_block_buffer_size;
69
70 let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
72 let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
73
74 let block_provider_clone = block_provider.clone();
75 tokio::task::spawn(async move {
76 let block_provider = block_provider_clone;
77 while benchmark_mode.contains(next_block) {
78 let block_res = block_provider
79 .get_block_by_number(next_block.into())
80 .full()
81 .await
82 .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
83 let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
84 Ok(block) => block,
85 Err(e) => {
86 tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}");
87 let _ = error_sender.send(e);
88 break;
89 }
90 };
91
92 let rlp = if rlp_blocks {
93 let Ok(rlp) = block_provider.debug_get_raw_block(next_block.into()).await
94 else {
95 tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}");
96 let _ = error_sender
97 .send(eyre::eyre!("Failed to fetch raw block {next_block}"));
98 break;
99 };
100 Some(rlp)
101 } else {
102 None
103 };
104
105 next_block += 1;
106 if let Err(e) = sender.send((block, rlp)).await {
107 tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
108 break;
109 }
110 }
111 });
112
113 let mut results = Vec::new();
114 let mut blocks_processed = 0u64;
115 let total_benchmark_duration = Instant::now();
116 let mut total_wait_time = Duration::ZERO;
117
118 while let Some((block, rlp)) = {
119 let wait_start = Instant::now();
120 let result = receiver.recv().await;
121 total_wait_time += wait_start.elapsed();
122 result
123 } {
124 let block_number = block.header.number;
125 let transaction_count = block.transactions.len() as u64;
126 let gas_used = block.header.gas_used;
127
128 debug!(target: "reth-bench", number=?block.header.number, "Sending payload to engine");
129
130 let bal = if rlp.is_none() && block.header.block_access_list_hash.is_some() {
131 Some(fetch_block_access_list(&block_provider, block.header.number).await?)
132 } else {
133 None
134 };
135
136 let (version, params) = block_to_new_payload(
137 block,
138 rlp,
139 use_reth_namespace,
140 wait_for_persistence,
141 no_wait_for_caches,
142 bal,
143 )?;
144
145 let start = Instant::now();
146 let server_timings =
147 call_new_payload_with_reth(&auth_provider, version, params).await?;
148
149 let latency =
150 server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
151 let new_payload_result = NewPayloadResult {
152 gas_used,
153 latency,
154 persistence_wait: server_timings
155 .as_ref()
156 .map(|t| t.persistence_wait)
157 .unwrap_or_default(),
158 execution_cache_wait: server_timings
159 .as_ref()
160 .map(|t| t.execution_cache_wait)
161 .unwrap_or_default(),
162 sparse_trie_wait: server_timings
163 .as_ref()
164 .map(|t| t.sparse_trie_wait)
165 .unwrap_or_default(),
166 };
167 blocks_processed += 1;
168 let progress = match total_blocks {
169 Some(total) => format!("{blocks_processed}/{total}"),
170 None => format!("{blocks_processed}"),
171 };
172 info!(target: "reth-bench", progress, %new_payload_result);
173
174 let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
177
178 let row =
180 TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
181 results.push((row, new_payload_result));
182
183 if let Some(scraper) = metrics_scraper.as_mut() &&
184 let Err(err) = scraper.scrape_after_block(block_number).await
185 {
186 tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
187 }
188 }
189
190 if let Ok(error) = error_receiver.try_recv() {
192 return Err(error);
193 }
194
195 let (gas_output_results, new_payload_results): (_, Vec<NewPayloadResult>) =
196 results.into_iter().unzip();
197
198 if let Some(path) = self.benchmark.output {
200 let output_path = path.join(NEW_PAYLOAD_OUTPUT_SUFFIX);
202 info!(target: "reth-bench", "Writing newPayload call latency output to file: {:?}", output_path);
203 let mut writer = Writer::from_path(output_path)?;
204 for result in new_payload_results {
205 writer.serialize(result)?;
206 }
207 writer.flush()?;
208
209 let output_path = path.join(GAS_OUTPUT_SUFFIX);
211 info!(target: "reth-bench", "Writing total gas output to file: {:?}", output_path);
212 let mut writer = Writer::from_path(output_path)?;
213 for row in &gas_output_results {
214 writer.serialize(row)?;
215 }
216 writer.flush()?;
217
218 if let Some(scraper) = &metrics_scraper {
219 scraper.write_csv(&path)?;
220 }
221
222 info!(target: "reth-bench", "Finished writing benchmark output files to {:?}.", path);
223 }
224
225 let gas_output = TotalGasOutput::new(gas_output_results)?;
227 info!(
228 target: "reth-bench",
229 total_duration=?gas_output.total_duration,
230 total_gas_used=?gas_output.total_gas_used,
231 blocks_processed=?gas_output.blocks_processed,
232 "Total Ggas/s: {:.4}",
233 gas_output.total_gigagas_per_second()
234 );
235
236 Ok(())
237 }
238}