Skip to main content

reth_bench/bench/
new_payload_only.rs

1//! Runs the `reth bench` command, sending only newPayload, without a forkchoiceUpdated call.
2
3use crate::{
4    bench::{
5        context::BenchContext,
6        metrics_scraper::MetricsScraper,
7        output::{
8            NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX,
9            NEW_PAYLOAD_OUTPUT_SUFFIX,
10        },
11    },
12    valid_payload::{block_to_new_payload, call_new_payload_with_reth},
13};
14use alloy_provider::{ext::DebugApi, Provider};
15use clap::Parser;
16use csv::Writer;
17use eyre::{Context, OptionExt};
18use reth_cli_runner::CliContext;
19use reth_node_core::args::BenchmarkArgs;
20use std::time::{Duration, Instant};
21use tracing::{debug, info};
22
23/// `reth benchmark new-payload-only` command
24#[derive(Debug, Parser)]
25pub struct Command {
26    /// The RPC url to use for getting data.
27    #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
28    rpc_url: String,
29
30    /// The size of the block buffer (channel capacity) for prefetching blocks from the RPC
31    /// endpoint.
32    #[arg(
33        long = "rpc-block-buffer-size",
34        value_name = "RPC_BLOCK_BUFFER_SIZE",
35        default_value = "20",
36        verbatim_doc_comment
37    )]
38    rpc_block_buffer_size: usize,
39
40    #[command(flatten)]
41    benchmark: BenchmarkArgs,
42}
43
44impl Command {
45    /// Execute `benchmark new-payload-only` command
46    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
47        let BenchContext {
48            benchmark_mode,
49            block_provider,
50            auth_provider,
51            mut next_block,
52            use_reth_namespace,
53            rlp_blocks,
54            wait_for_persistence,
55            no_wait_for_caches,
56        } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
57
58        let total_blocks = benchmark_mode.total_blocks();
59
60        let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
61
62        if use_reth_namespace {
63            info!("Using reth_newPayload endpoint");
64        }
65
66        let buffer_size = self.rpc_block_buffer_size;
67
68        // Use a oneshot channel to propagate errors from the spawned task
69        let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
70        let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
71
72        tokio::task::spawn(async move {
73            while benchmark_mode.contains(next_block) {
74                let block_res = block_provider
75                    .get_block_by_number(next_block.into())
76                    .full()
77                    .await
78                    .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
79                let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
80                    Ok(block) => block,
81                    Err(e) => {
82                        tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}");
83                        let _ = error_sender.send(e);
84                        break;
85                    }
86                };
87
88                let rlp = if rlp_blocks {
89                    let Ok(rlp) = block_provider.debug_get_raw_block(next_block.into()).await
90                    else {
91                        tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}");
92                        let _ = error_sender
93                            .send(eyre::eyre!("Failed to fetch raw block {next_block}"));
94                        break;
95                    };
96                    Some(rlp)
97                } else {
98                    None
99                };
100
101                next_block += 1;
102                if let Err(e) = sender.send((block, rlp)).await {
103                    tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
104                    break;
105                }
106            }
107        });
108
109        let mut results = Vec::new();
110        let mut blocks_processed = 0u64;
111        let total_benchmark_duration = Instant::now();
112        let mut total_wait_time = Duration::ZERO;
113
114        while let Some((block, rlp)) = {
115            let wait_start = Instant::now();
116            let result = receiver.recv().await;
117            total_wait_time += wait_start.elapsed();
118            result
119        } {
120            let block_number = block.header.number;
121            let transaction_count = block.transactions.len() as u64;
122            let gas_used = block.header.gas_used;
123
124            debug!(target: "reth-bench", number=?block.header.number, "Sending payload to engine");
125
126            let (version, params) = block_to_new_payload(
127                block,
128                rlp,
129                use_reth_namespace,
130                wait_for_persistence,
131                no_wait_for_caches,
132            )?;
133
134            let start = Instant::now();
135            let server_timings =
136                call_new_payload_with_reth(&auth_provider, version, params).await?;
137
138            let latency =
139                server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
140            let new_payload_result = NewPayloadResult {
141                gas_used,
142                latency,
143                persistence_wait: server_timings
144                    .as_ref()
145                    .map(|t| t.persistence_wait)
146                    .unwrap_or_default(),
147                execution_cache_wait: server_timings
148                    .as_ref()
149                    .map(|t| t.execution_cache_wait)
150                    .unwrap_or_default(),
151                sparse_trie_wait: server_timings
152                    .as_ref()
153                    .map(|t| t.sparse_trie_wait)
154                    .unwrap_or_default(),
155            };
156            blocks_processed += 1;
157            let progress = match total_blocks {
158                Some(total) => format!("{blocks_processed}/{total}"),
159                None => format!("{blocks_processed}"),
160            };
161            info!(target: "reth-bench", progress, %new_payload_result);
162
163            // current duration since the start of the benchmark minus the time
164            // waiting for blocks
165            let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
166
167            // record the current result
168            let row =
169                TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
170            results.push((row, new_payload_result));
171
172            if let Some(scraper) = metrics_scraper.as_mut() &&
173                let Err(err) = scraper.scrape_after_block(block_number).await
174            {
175                tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
176            }
177        }
178
179        // Check if the spawned task encountered an error
180        if let Ok(error) = error_receiver.try_recv() {
181            return Err(error);
182        }
183
184        let (gas_output_results, new_payload_results): (_, Vec<NewPayloadResult>) =
185            results.into_iter().unzip();
186
187        // write the csv output to files
188        if let Some(path) = self.benchmark.output {
189            // first write the new payload results to a file
190            let output_path = path.join(NEW_PAYLOAD_OUTPUT_SUFFIX);
191            info!(target: "reth-bench", "Writing newPayload call latency output to file: {:?}", output_path);
192            let mut writer = Writer::from_path(output_path)?;
193            for result in new_payload_results {
194                writer.serialize(result)?;
195            }
196            writer.flush()?;
197
198            // now write the gas output to a file
199            let output_path = path.join(GAS_OUTPUT_SUFFIX);
200            info!(target: "reth-bench", "Writing total gas output to file: {:?}", output_path);
201            let mut writer = Writer::from_path(output_path)?;
202            for row in &gas_output_results {
203                writer.serialize(row)?;
204            }
205            writer.flush()?;
206
207            if let Some(scraper) = &metrics_scraper {
208                scraper.write_csv(&path)?;
209            }
210
211            info!(target: "reth-bench", "Finished writing benchmark output files to {:?}.", path);
212        }
213
214        // accumulate the results and calculate the overall Ggas/s
215        let gas_output = TotalGasOutput::new(gas_output_results)?;
216        info!(
217            target: "reth-bench",
218            total_duration=?gas_output.total_duration,
219            total_gas_used=?gas_output.total_gas_used,
220            blocks_processed=?gas_output.blocks_processed,
221            "Total Ggas/s: {:.4}",
222            gas_output.total_gigagas_per_second()
223        );
224
225        Ok(())
226    }
227}