reth_bench/bench/
new_payload_only.rs

1//! Runs the `reth bench` command, sending only newPayload, without a forkchoiceUpdated call.
2
3use crate::{
4    bench::{
5        context::BenchContext,
6        output::{
7            NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX,
8            NEW_PAYLOAD_OUTPUT_SUFFIX,
9        },
10    },
11    valid_payload::{block_to_new_payload, call_new_payload},
12};
13use alloy_provider::Provider;
14use clap::Parser;
15use csv::Writer;
16use eyre::Context;
17use reth_cli_runner::CliContext;
18use reth_node_core::args::BenchmarkArgs;
19use std::time::{Duration, Instant};
20use tracing::{debug, info};
21
22/// `reth benchmark new-payload-only` command
23#[derive(Debug, Parser)]
24pub struct Command {
25    /// The RPC url to use for getting data.
26    #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
27    rpc_url: String,
28
29    #[command(flatten)]
30    benchmark: BenchmarkArgs,
31}
32
33impl Command {
34    /// Execute `benchmark new-payload-only` command
35    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
36        let BenchContext {
37            benchmark_mode,
38            block_provider,
39            auth_provider,
40            mut next_block,
41            is_optimism,
42        } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
43
44        let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
45        tokio::task::spawn(async move {
46            while benchmark_mode.contains(next_block) {
47                let block_res = block_provider
48                    .get_block_by_number(next_block.into())
49                    .full()
50                    .await
51                    .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
52                let block = block_res.unwrap().unwrap();
53                let header = block.header.clone();
54
55                let (version, params) = block_to_new_payload(block, is_optimism).unwrap();
56
57                next_block += 1;
58                sender.send((header, version, params)).await.unwrap();
59            }
60        });
61
62        // put results in a summary vec so they can be printed at the end
63        let mut results = Vec::new();
64        let total_benchmark_duration = Instant::now();
65        let mut total_wait_time = Duration::ZERO;
66
67        while let Some((header, version, params)) = {
68            let wait_start = Instant::now();
69            let result = receiver.recv().await;
70            total_wait_time += wait_start.elapsed();
71            result
72        } {
73            // just put gas used here
74            let gas_used = header.gas_used;
75
76            let block_number = header.number;
77
78            debug!(
79                target: "reth-bench",
80                number=?header.number,
81                "Sending payload to engine",
82            );
83
84            let start = Instant::now();
85            call_new_payload(&auth_provider, version, params).await?;
86
87            let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
88            info!(%new_payload_result);
89
90            // current duration since the start of the benchmark minus the time
91            // waiting for blocks
92            let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
93
94            // record the current result
95            let row = TotalGasRow { block_number, gas_used, time: current_duration };
96            results.push((row, new_payload_result));
97        }
98
99        let (gas_output_results, new_payload_results): (_, Vec<NewPayloadResult>) =
100            results.into_iter().unzip();
101
102        // write the csv output to files
103        if let Some(path) = self.benchmark.output {
104            // first write the new payload results to a file
105            let output_path = path.join(NEW_PAYLOAD_OUTPUT_SUFFIX);
106            info!("Writing newPayload call latency output to file: {:?}", output_path);
107            let mut writer = Writer::from_path(output_path)?;
108            for result in new_payload_results {
109                writer.serialize(result)?;
110            }
111            writer.flush()?;
112
113            // now write the gas output to a file
114            let output_path = path.join(GAS_OUTPUT_SUFFIX);
115            info!("Writing total gas output to file: {:?}", output_path);
116            let mut writer = Writer::from_path(output_path)?;
117            for row in &gas_output_results {
118                writer.serialize(row)?;
119            }
120            writer.flush()?;
121
122            info!("Finished writing benchmark output files to {:?}.", path);
123        }
124
125        // accumulate the results and calculate the overall Ggas/s
126        let gas_output = TotalGasOutput::new(gas_output_results);
127        info!(
128            total_duration=?gas_output.total_duration,
129            total_gas_used=?gas_output.total_gas_used,
130            blocks_processed=?gas_output.blocks_processed,
131            "Total Ggas/s: {:.4}",
132            gas_output.total_gigagas_per_second()
133        );
134
135        Ok(())
136    }
137}