reth_bench/bench/
new_payload_only.rs

1//! Runs the `reth bench` command, sending only newPayload, without a forkchoiceUpdated call.
2
3use crate::{
4    bench::{
5        context::BenchContext,
6        output::{
7            NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX,
8            NEW_PAYLOAD_OUTPUT_SUFFIX,
9        },
10    },
11    valid_payload::call_new_payload,
12};
13use alloy_provider::Provider;
14use alloy_rpc_types_engine::ExecutionPayload;
15use clap::Parser;
16use csv::Writer;
17use reth_cli_runner::CliContext;
18use reth_node_core::args::BenchmarkArgs;
19use std::time::{Duration, Instant};
20use tracing::{debug, info};
21
22/// `reth benchmark new-payload-only` command
23#[derive(Debug, Parser)]
24pub struct Command {
25    /// The RPC url to use for getting data.
26    #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
27    rpc_url: String,
28
29    #[command(flatten)]
30    benchmark: BenchmarkArgs,
31}
32
33impl Command {
34    /// Execute `benchmark new-payload-only` command
35    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
36        // TODO: this could be just a function I guess, but destructuring makes the code slightly
37        // more readable than a 4 element tuple.
38        let BenchContext { benchmark_mode, block_provider, auth_provider, mut next_block } =
39            BenchContext::new(&self.benchmark, self.rpc_url).await?;
40
41        let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
42        tokio::task::spawn(async move {
43            while benchmark_mode.contains(next_block) {
44                let block_res = block_provider.get_block_by_number(next_block.into()).full().await;
45                let block = block_res.unwrap().unwrap();
46                let block = block
47                    .into_inner()
48                    .map_header(|header| header.map(|h| h.into_header_with_defaults()))
49                    .try_map_transactions(|tx| {
50                        tx.try_into_either::<op_alloy_consensus::OpTxEnvelope>()
51                    })
52                    .unwrap()
53                    .into_consensus();
54
55                let blob_versioned_hashes =
56                    block.body.blob_versioned_hashes_iter().copied().collect::<Vec<_>>();
57                let payload = ExecutionPayload::from_block_slow(&block).0;
58
59                next_block += 1;
60                sender.send((block.header, blob_versioned_hashes, payload)).await.unwrap();
61            }
62        });
63
64        // put results in a summary vec so they can be printed at the end
65        let mut results = Vec::new();
66        let total_benchmark_duration = Instant::now();
67        let mut total_wait_time = Duration::ZERO;
68
69        while let Some((header, versioned_hashes, payload)) = {
70            let wait_start = Instant::now();
71            let result = receiver.recv().await;
72            total_wait_time += wait_start.elapsed();
73            result
74        } {
75            // just put gas used here
76            let gas_used = header.gas_used;
77
78            let block_number = payload.block_number();
79
80            debug!(
81                target: "reth-bench",
82                number=?header.number,
83                "Sending payload to engine",
84            );
85
86            let start = Instant::now();
87            call_new_payload(
88                &auth_provider,
89                payload,
90                header.parent_beacon_block_root,
91                versioned_hashes,
92            )
93            .await?;
94
95            let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
96            info!(%new_payload_result);
97
98            // current duration since the start of the benchmark minus the time
99            // waiting for blocks
100            let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
101
102            // record the current result
103            let row = TotalGasRow { block_number, gas_used, time: current_duration };
104            results.push((row, new_payload_result));
105        }
106
107        let (gas_output_results, new_payload_results): (_, Vec<NewPayloadResult>) =
108            results.into_iter().unzip();
109
110        // write the csv output to files
111        if let Some(path) = self.benchmark.output {
112            // first write the new payload results to a file
113            let output_path = path.join(NEW_PAYLOAD_OUTPUT_SUFFIX);
114            info!("Writing newPayload call latency output to file: {:?}", output_path);
115            let mut writer = Writer::from_path(output_path)?;
116            for result in new_payload_results {
117                writer.serialize(result)?;
118            }
119            writer.flush()?;
120
121            // now write the gas output to a file
122            let output_path = path.join(GAS_OUTPUT_SUFFIX);
123            info!("Writing total gas output to file: {:?}", output_path);
124            let mut writer = Writer::from_path(output_path)?;
125            for row in &gas_output_results {
126                writer.serialize(row)?;
127            }
128            writer.flush()?;
129
130            info!("Finished writing benchmark output files to {:?}.", path);
131        }
132
133        // accumulate the results and calculate the overall Ggas/s
134        let gas_output = TotalGasOutput::new(gas_output_results);
135        info!(
136            total_duration=?gas_output.total_duration,
137            total_gas_used=?gas_output.total_gas_used,
138            blocks_processed=?gas_output.blocks_processed,
139            "Total Ggas/s: {:.4}",
140            gas_output.total_gigagas_per_second()
141        );
142
143        Ok(())
144    }
145}