Skip to main content

reth_bench/bench/
new_payload_fcu.rs

1//! Runs the `reth bench` command, calling first newPayload for each block, then calling
2//! forkchoiceUpdated.
3
4use crate::{
5    bench::{
6        context::BenchContext,
7        helpers::parse_duration,
8        metrics_scraper::MetricsScraper,
9        output::{
10            write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
11        },
12    },
13    valid_payload::{
14        block_to_new_payload, call_forkchoice_updated_with_reth, call_new_payload_with_reth,
15    },
16};
17use alloy_provider::{ext::DebugApi, Provider};
18use alloy_rpc_types_engine::ForkchoiceState;
19use clap::Parser;
20use eyre::{Context, OptionExt};
21use futures::{stream, StreamExt, TryStreamExt};
22use reth_cli_runner::CliContext;
23use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
24use reth_node_core::args::BenchmarkArgs;
25use std::time::{Duration, Instant};
26use tracing::{debug, info, warn};
27
28/// `reth benchmark new-payload-fcu` command
29#[derive(Debug, Parser)]
30pub struct Command {
31    /// The RPC url to use for getting data.
32    #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
33    rpc_url: String,
34
35    /// How long to wait after a forkchoice update before sending the next payload.
36    ///
37    /// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
38    /// milliseconds (e.g. `400`).
39    #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
40    wait_time: Option<Duration>,
41
42    /// Engine persistence threshold used for deciding when to wait for persistence.
43    ///
44    /// The benchmark waits after every `(threshold + 1)` blocks. By default this
45    /// matches the engine's `DEFAULT_PERSISTENCE_THRESHOLD` (2), so waits occur
46    /// at blocks 3, 6, 9, etc.
47    #[arg(
48        long = "persistence-threshold",
49        value_name = "PERSISTENCE_THRESHOLD",
50        default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
51        verbatim_doc_comment
52    )]
53    persistence_threshold: u64,
54
55    /// Timeout for waiting on persistence at each checkpoint.
56    ///
57    /// Must be long enough to account for the persistence thread being blocked
58    /// by pruning after the previous save.
59    #[arg(
60        long = "persistence-timeout",
61        value_name = "PERSISTENCE_TIMEOUT",
62        value_parser = parse_duration,
63        default_value = "120s",
64        verbatim_doc_comment
65    )]
66    persistence_timeout: Duration,
67
68    /// The size of the block buffer (channel capacity) for prefetching blocks from the RPC
69    /// endpoint.
70    #[arg(
71        long = "rpc-block-buffer-size",
72        value_name = "RPC_BLOCK_BUFFER_SIZE",
73        default_value = "20",
74        verbatim_doc_comment
75    )]
76    rpc_block_buffer_size: usize,
77
78    #[command(flatten)]
79    benchmark: BenchmarkArgs,
80}
81
82impl Command {
83    /// Execute `benchmark new-payload-fcu` command
84    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
85        // Log mode configuration
86        if let Some(duration) = self.wait_time {
87            info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis());
88        }
89
90        let BenchContext {
91            benchmark_mode,
92            block_provider,
93            auth_provider,
94            next_block,
95            is_optimism,
96            use_reth_namespace,
97            rlp_blocks,
98            wait_for_persistence,
99            no_wait_for_caches,
100        } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
101
102        let total_blocks = benchmark_mode.total_blocks();
103
104        let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
105
106        if use_reth_namespace {
107            info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
108        }
109
110        let buffer_size = self.rpc_block_buffer_size;
111
112        let mut blocks = Box::pin(
113            stream::iter((next_block..)
114                .take_while(|next_block| {
115                    benchmark_mode.contains(*next_block)
116                }))
117                .map(|next_block| {
118                    let block_provider = block_provider.clone();
119                    async move {
120                        let block_res = block_provider
121                            .get_block_by_number(next_block.into())
122                            .full()
123                            .await
124                            .wrap_err_with(|| {
125                                format!("Failed to fetch block by number {next_block}")
126                            });
127                        let block =
128                            match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
129                                Ok(block) => block,
130                                Err(e) => {
131                                    tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}");
132                                    return Err(e)
133                                }
134                            };
135
136                        let rlp = if rlp_blocks {
137                            let rlp = match block_provider
138                                .debug_get_raw_block(next_block.into())
139                                .await
140                            {
141                                Ok(rlp) => rlp,
142                                Err(e) => {
143                                    tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}: {e}");
144                                    return Err(e.into())
145                                }
146                            };
147                            Some(rlp)
148                        } else {
149                            None
150                        };
151
152                        let head_block_hash = block.header.hash;
153                        let safe_block_hash = block_provider
154                            .get_block_by_number(block.header.number.saturating_sub(32).into());
155
156                        let finalized_block_hash = block_provider
157                            .get_block_by_number(block.header.number.saturating_sub(64).into());
158
159                        let (safe, finalized) =
160                            tokio::join!(safe_block_hash, finalized_block_hash);
161
162                        let safe_block_hash = match safe {
163                            Ok(Some(block)) => block.header.hash,
164                            Ok(None) | Err(_) => head_block_hash,
165                        };
166
167                        let finalized_block_hash = match finalized {
168                            Ok(Some(block)) => block.header.hash,
169                            Ok(None) | Err(_) => head_block_hash,
170                        };
171
172                        Ok((block, head_block_hash, safe_block_hash, finalized_block_hash, rlp))
173                    }
174                })
175                .buffered(buffer_size),
176        );
177
178        let mut results = Vec::new();
179        let mut blocks_processed = 0u64;
180        let total_benchmark_duration = Instant::now();
181        let mut total_wait_time = Duration::ZERO;
182
183        while let Some((block, head, safe, finalized, rlp)) = {
184            let wait_start = Instant::now();
185            let result = blocks.try_next().await?;
186            total_wait_time += wait_start.elapsed();
187            result
188        } {
189            let gas_used = block.header.gas_used;
190            let gas_limit = block.header.gas_limit;
191            let block_number = block.header.number;
192            let transaction_count = block.transactions.len() as u64;
193
194            debug!(target: "reth-bench", ?block_number, "Sending payload");
195
196            let forkchoice_state = ForkchoiceState {
197                head_block_hash: head,
198                safe_block_hash: safe,
199                finalized_block_hash: finalized,
200            };
201
202            let (version, params) = block_to_new_payload(
203                block,
204                is_optimism,
205                rlp,
206                use_reth_namespace,
207                wait_for_persistence,
208                no_wait_for_caches,
209            )?;
210            let start = Instant::now();
211            let server_timings =
212                call_new_payload_with_reth(&auth_provider, version, params).await?;
213
214            let np_latency =
215                server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
216            let new_payload_result = NewPayloadResult {
217                gas_used,
218                latency: np_latency,
219                persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
220                execution_cache_wait: server_timings
221                    .as_ref()
222                    .map(|t| t.execution_cache_wait)
223                    .unwrap_or_default(),
224                sparse_trie_wait: server_timings
225                    .as_ref()
226                    .map(|t| t.sparse_trie_wait)
227                    .unwrap_or_default(),
228            };
229
230            let fcu_start = Instant::now();
231            call_forkchoice_updated_with_reth(&auth_provider, version, forkchoice_state).await?;
232            let fcu_latency = fcu_start.elapsed();
233
234            let total_latency = if server_timings.is_some() {
235                // When using server-side latency for newPayload, derive total from the
236                // independently measured components to avoid mixing server-side and
237                // client-side (network-inclusive) timings.
238                np_latency + fcu_latency
239            } else {
240                start.elapsed()
241            };
242            let combined_result = CombinedResult {
243                block_number,
244                gas_limit,
245                transaction_count,
246                new_payload_result,
247                fcu_latency,
248                total_latency,
249            };
250
251            // Exclude time spent waiting on the block prefetch channel from the benchmark duration.
252            // We want to measure engine throughput, not RPC fetch latency.
253            blocks_processed += 1;
254            let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
255            let progress = match total_blocks {
256                Some(total) => format!("{blocks_processed}/{total}"),
257                None => format!("{blocks_processed}"),
258            };
259            info!(target: "reth-bench", progress, %combined_result);
260
261            if let Some(scraper) = metrics_scraper.as_mut() &&
262                let Err(err) = scraper.scrape_after_block(block_number).await
263            {
264                warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
265            }
266
267            if let Some(wait_time) = self.wait_time {
268                tokio::time::sleep(wait_time).await;
269            }
270
271            let gas_row =
272                TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
273            results.push((gas_row, combined_result));
274        }
275
276        let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
277            results.into_iter().unzip();
278
279        if let Some(ref path) = self.benchmark.output {
280            write_benchmark_results(path, &gas_output_results, &combined_results)?;
281        }
282
283        if let (Some(path), Some(scraper)) = (&self.benchmark.output, &metrics_scraper) {
284            scraper.write_csv(path)?;
285        }
286
287        let gas_output =
288            TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
289
290        info!(
291            target: "reth-bench",
292            total_gas_used = gas_output.total_gas_used,
293            total_duration = ?gas_output.total_duration,
294            execution_duration = ?gas_output.execution_duration,
295            blocks_processed = gas_output.blocks_processed,
296            wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
297            execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
298            "Benchmark complete"
299        );
300
301        Ok(())
302    }
303}