reth_bench/bench/
new_payload_fcu.rs

1//! Runs the `reth bench` command, calling first newPayload for each block, then calling
2//! forkchoiceUpdated.
3
4use crate::{
5    bench::{
6        context::BenchContext,
7        output::{
8            CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX,
9            GAS_OUTPUT_SUFFIX,
10        },
11    },
12    valid_payload::{call_forkchoice_updated, call_new_payload},
13};
14use alloy_provider::Provider;
15use alloy_rpc_types_engine::{ExecutionPayload, ForkchoiceState};
16use clap::Parser;
17use csv::Writer;
18use humantime::parse_duration;
19use reth_cli_runner::CliContext;
20use reth_node_core::args::BenchmarkArgs;
21use std::time::{Duration, Instant};
22use tracing::{debug, info};
23
24/// `reth benchmark new-payload-fcu` command
25#[derive(Debug, Parser)]
26pub struct Command {
27    /// The RPC url to use for getting data.
28    #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
29    rpc_url: String,
30
31    /// How long to wait after a forkchoice update before sending the next payload.
32    #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
33    wait_time: Option<Duration>,
34
35    #[command(flatten)]
36    benchmark: BenchmarkArgs,
37}
38
39impl Command {
40    /// Execute `benchmark new-payload-fcu` command
41    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
42        let BenchContext { benchmark_mode, block_provider, auth_provider, mut next_block } =
43            BenchContext::new(&self.benchmark, self.rpc_url).await?;
44
45        let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
46        tokio::task::spawn(async move {
47            while benchmark_mode.contains(next_block) {
48                let block_res = block_provider.get_block_by_number(next_block.into()).full().await;
49                let block = block_res.unwrap().unwrap();
50
51                let block = block
52                    .into_inner()
53                    .map_header(|header| header.map(|h| h.into_header_with_defaults()))
54                    .try_map_transactions(|tx| {
55                        // try to convert unknowns into op type so that we can also support optimism
56                        tx.try_into_either::<op_alloy_consensus::OpTxEnvelope>()
57                    })
58                    .unwrap()
59                    .into_consensus();
60
61                let blob_versioned_hashes =
62                    block.body.blob_versioned_hashes_iter().copied().collect::<Vec<_>>();
63
64                // Convert to execution payload
65                let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
66                let header = block.header;
67                let head_block_hash = payload.block_hash();
68                let safe_block_hash =
69                    block_provider.get_block_by_number(header.number.saturating_sub(32).into());
70
71                let finalized_block_hash =
72                    block_provider.get_block_by_number(header.number.saturating_sub(64).into());
73
74                let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
75
76                let safe_block_hash = safe.unwrap().expect("finalized block exists").header.hash;
77                let finalized_block_hash =
78                    finalized.unwrap().expect("finalized block exists").header.hash;
79
80                next_block += 1;
81                sender
82                    .send((
83                        header,
84                        blob_versioned_hashes,
85                        payload,
86                        sidecar,
87                        head_block_hash,
88                        safe_block_hash,
89                        finalized_block_hash,
90                    ))
91                    .await
92                    .unwrap();
93            }
94        });
95
96        // put results in a summary vec so they can be printed at the end
97        let mut results = Vec::new();
98        let total_benchmark_duration = Instant::now();
99        let mut total_wait_time = Duration::ZERO;
100
101        while let Some((header, versioned_hashes, payload, sidecar, head, safe, finalized)) = {
102            let wait_start = Instant::now();
103            let result = receiver.recv().await;
104            total_wait_time += wait_start.elapsed();
105            result
106        } {
107            // just put gas used here
108            let gas_used = header.gas_used;
109            let block_number = header.number;
110
111            debug!(target: "reth-bench", ?block_number, "Sending payload",);
112
113            // construct fcu to call
114            let forkchoice_state = ForkchoiceState {
115                head_block_hash: head,
116                safe_block_hash: safe,
117                finalized_block_hash: finalized,
118            };
119
120            let start = Instant::now();
121            let message_version = call_new_payload(
122                &auth_provider,
123                payload,
124                sidecar,
125                header.parent_beacon_block_root,
126                versioned_hashes,
127            )
128            .await?;
129
130            let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
131
132            call_forkchoice_updated(&auth_provider, message_version, forkchoice_state, None)
133                .await?;
134
135            // calculate the total duration and the fcu latency, record
136            let total_latency = start.elapsed();
137            let fcu_latency = total_latency - new_payload_result.latency;
138            let combined_result =
139                CombinedResult { block_number, new_payload_result, fcu_latency, total_latency };
140
141            // current duration since the start of the benchmark minus the time
142            // waiting for blocks
143            let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
144
145            // convert gas used to gigagas, then compute gigagas per second
146            info!(%combined_result);
147
148            // wait if we need to
149            if let Some(wait_time) = self.wait_time {
150                tokio::time::sleep(wait_time).await;
151            }
152
153            // record the current result
154            let gas_row = TotalGasRow { block_number, gas_used, time: current_duration };
155            results.push((gas_row, combined_result));
156        }
157
158        let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
159            results.into_iter().unzip();
160
161        // write the csv output to files
162        if let Some(path) = self.benchmark.output {
163            // first write the combined results to a file
164            let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
165            info!("Writing engine api call latency output to file: {:?}", output_path);
166            let mut writer = Writer::from_path(output_path)?;
167            for result in combined_results {
168                writer.serialize(result)?;
169            }
170            writer.flush()?;
171
172            // now write the gas output to a file
173            let output_path = path.join(GAS_OUTPUT_SUFFIX);
174            info!("Writing total gas output to file: {:?}", output_path);
175            let mut writer = Writer::from_path(output_path)?;
176            for row in &gas_output_results {
177                writer.serialize(row)?;
178            }
179            writer.flush()?;
180
181            info!("Finished writing benchmark output files to {:?}.", path);
182        }
183
184        // accumulate the results and calculate the overall Ggas/s
185        let gas_output = TotalGasOutput::new(gas_output_results);
186        info!(
187            total_duration=?gas_output.total_duration,
188            total_gas_used=?gas_output.total_gas_used,
189            blocks_processed=?gas_output.blocks_processed,
190            "Total Ggas/s: {:.4}",
191            gas_output.total_gigagas_per_second()
192        );
193
194        Ok(())
195    }
196}