Skip to main content

reth_bench/bench/
new_payload_fcu.rs

1//! Runs the `reth bench` command, calling first newPayload for each block, then calling
2//! forkchoiceUpdated.
3//!
4//! Supports configurable waiting behavior:
5//! - **`--wait-time`**: Fixed sleep interval between blocks.
6//! - **`--wait-for-persistence`**: Waits for every Nth block to be persisted using the
7//!   `reth_subscribePersistedBlock` subscription, where N matches the engine's persistence
8//!   threshold. This ensures the benchmark doesn't outpace persistence.
9//!
10//! Both options can be used together or independently.
11
12use crate::{
13    bench::{
14        context::BenchContext,
15        helpers::parse_duration,
16        metrics_scraper::MetricsScraper,
17        output::{
18            write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
19        },
20        persistence_waiter::{
21            derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
22        },
23    },
24    valid_payload::{
25        block_to_new_payload, call_forkchoice_updated_with_reth, call_new_payload_with_reth,
26    },
27};
28use alloy_provider::{ext::DebugApi, Provider};
29use alloy_rpc_types_engine::ForkchoiceState;
30use clap::Parser;
31use eyre::{Context, OptionExt};
32use reth_cli_runner::CliContext;
33use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
34use reth_node_core::args::BenchmarkArgs;
35use std::time::{Duration, Instant};
36use tracing::{debug, info, warn};
37
38/// `reth benchmark new-payload-fcu` command
39#[derive(Debug, Parser)]
40pub struct Command {
41    /// The RPC url to use for getting data.
42    #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
43    rpc_url: String,
44
45    /// How long to wait after a forkchoice update before sending the next payload.
46    ///
47    /// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
48    /// milliseconds (e.g. `400`).
49    #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
50    wait_time: Option<Duration>,
51
52    /// Wait for blocks to be persisted before sending the next batch.
53    ///
54    /// When enabled, waits for every Nth block to be persisted using the
55    /// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
56    /// doesn't outpace persistence.
57    ///
58    /// The subscription uses the regular RPC websocket endpoint (no JWT required).
59    #[arg(long, default_value = "false", verbatim_doc_comment)]
60    wait_for_persistence: bool,
61
62    /// Engine persistence threshold used for deciding when to wait for persistence.
63    ///
64    /// The benchmark waits after every `(threshold + 1)` blocks. By default this
65    /// matches the engine's `DEFAULT_PERSISTENCE_THRESHOLD` (2), so waits occur
66    /// at blocks 3, 6, 9, etc.
67    #[arg(
68        long = "persistence-threshold",
69        value_name = "PERSISTENCE_THRESHOLD",
70        default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
71        verbatim_doc_comment
72    )]
73    persistence_threshold: u64,
74
75    /// Timeout for waiting on persistence at each checkpoint.
76    ///
77    /// Must be long enough to account for the persistence thread being blocked
78    /// by pruning after the previous save.
79    #[arg(
80        long = "persistence-timeout",
81        value_name = "PERSISTENCE_TIMEOUT",
82        value_parser = parse_duration,
83        default_value = "120s",
84        verbatim_doc_comment
85    )]
86    persistence_timeout: Duration,
87
88    /// The size of the block buffer (channel capacity) for prefetching blocks from the RPC
89    /// endpoint.
90    #[arg(
91        long = "rpc-block-buffer-size",
92        value_name = "RPC_BLOCK_BUFFER_SIZE",
93        default_value = "20",
94        verbatim_doc_comment
95    )]
96    rpc_block_buffer_size: usize,
97
98    #[command(flatten)]
99    benchmark: BenchmarkArgs,
100}
101
102impl Command {
103    /// Execute `benchmark new-payload-fcu` command
104    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
105        // Log mode configuration
106        if let Some(duration) = self.wait_time {
107            info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis());
108        }
109        if self.wait_for_persistence {
110            info!(
111                target: "reth-bench",
112                "Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
113                self.persistence_threshold + 1,
114                self.persistence_threshold
115            );
116        }
117
118        // Set up waiter based on configured options
119        // When both are set: wait at least wait_time, and also wait for persistence if needed
120        let mut waiter = match (self.wait_time, self.wait_for_persistence) {
121            (Some(duration), true) => {
122                let ws_url = derive_ws_rpc_url(
123                    self.benchmark.ws_rpc_url.as_deref(),
124                    &self.benchmark.engine_rpc_url,
125                )?;
126                let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
127                Some(PersistenceWaiter::with_duration_and_subscription(
128                    duration,
129                    sub,
130                    self.persistence_threshold,
131                    self.persistence_timeout,
132                ))
133            }
134            (Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
135            (None, true) => {
136                let ws_url = derive_ws_rpc_url(
137                    self.benchmark.ws_rpc_url.as_deref(),
138                    &self.benchmark.engine_rpc_url,
139                )?;
140                let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
141                Some(PersistenceWaiter::with_subscription(
142                    sub,
143                    self.persistence_threshold,
144                    self.persistence_timeout,
145                ))
146            }
147            (None, false) => None,
148        };
149
150        let BenchContext {
151            benchmark_mode,
152            block_provider,
153            auth_provider,
154            mut next_block,
155            is_optimism,
156            use_reth_namespace,
157            rlp_blocks,
158        } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
159
160        let total_blocks = benchmark_mode.total_blocks();
161
162        let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
163
164        if use_reth_namespace {
165            info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
166        }
167
168        let buffer_size = self.rpc_block_buffer_size;
169
170        // Use a oneshot channel to propagate errors from the spawned task
171        let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
172        let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
173
174        tokio::task::spawn(async move {
175            while benchmark_mode.contains(next_block) {
176                let block_res = block_provider
177                    .get_block_by_number(next_block.into())
178                    .full()
179                    .await
180                    .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
181                let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
182                    Ok(block) => block,
183                    Err(e) => {
184                        tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}");
185                        let _ = error_sender.send(e);
186                        break;
187                    }
188                };
189
190                let rlp = if rlp_blocks {
191                    let rlp = match block_provider.debug_get_raw_block(next_block.into()).await {
192                        Ok(rlp) => rlp,
193                        Err(e) => {
194                            tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}: {e}");
195                            let _ = error_sender
196                                .send(eyre::eyre!("Failed to fetch raw block {next_block}: {e}"));
197                            break;
198                        }
199                    };
200                    Some(rlp)
201                } else {
202                    None
203                };
204
205                let head_block_hash = block.header.hash;
206                let safe_block_hash = block_provider
207                    .get_block_by_number(block.header.number.saturating_sub(32).into());
208
209                let finalized_block_hash = block_provider
210                    .get_block_by_number(block.header.number.saturating_sub(64).into());
211
212                let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
213
214                let safe_block_hash = match safe {
215                    Ok(Some(block)) => block.header.hash,
216                    Ok(None) | Err(_) => head_block_hash,
217                };
218
219                let finalized_block_hash = match finalized {
220                    Ok(Some(block)) => block.header.hash,
221                    Ok(None) | Err(_) => head_block_hash,
222                };
223
224                next_block += 1;
225                if let Err(e) = sender
226                    .send((block, head_block_hash, safe_block_hash, finalized_block_hash, rlp))
227                    .await
228                {
229                    tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
230                    break;
231                }
232            }
233        });
234
235        let mut results = Vec::new();
236        let mut blocks_processed = 0u64;
237        let total_benchmark_duration = Instant::now();
238        let mut total_wait_time = Duration::ZERO;
239
240        while let Some((block, head, safe, finalized, rlp)) = {
241            let wait_start = Instant::now();
242            let result = receiver.recv().await;
243            total_wait_time += wait_start.elapsed();
244            result
245        } {
246            let gas_used = block.header.gas_used;
247            let gas_limit = block.header.gas_limit;
248            let block_number = block.header.number;
249            let transaction_count = block.transactions.len() as u64;
250
251            debug!(target: "reth-bench", ?block_number, "Sending payload");
252
253            let forkchoice_state = ForkchoiceState {
254                head_block_hash: head,
255                safe_block_hash: safe,
256                finalized_block_hash: finalized,
257            };
258
259            let (version, params) =
260                block_to_new_payload(block, is_optimism, rlp, use_reth_namespace)?;
261            let start = Instant::now();
262            let server_timings =
263                call_new_payload_with_reth(&auth_provider, version, params).await?;
264
265            let np_latency =
266                server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
267            let new_payload_result = NewPayloadResult {
268                gas_used,
269                latency: np_latency,
270                persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
271                execution_cache_wait: server_timings
272                    .as_ref()
273                    .map(|t| t.execution_cache_wait)
274                    .unwrap_or_default(),
275                sparse_trie_wait: server_timings
276                    .as_ref()
277                    .map(|t| t.sparse_trie_wait)
278                    .unwrap_or_default(),
279            };
280
281            let fcu_start = Instant::now();
282            call_forkchoice_updated_with_reth(&auth_provider, version, forkchoice_state).await?;
283            let fcu_latency = fcu_start.elapsed();
284
285            let total_latency = if server_timings.is_some() {
286                // When using server-side latency for newPayload, derive total from the
287                // independently measured components to avoid mixing server-side and
288                // client-side (network-inclusive) timings.
289                np_latency + fcu_latency
290            } else {
291                start.elapsed()
292            };
293            let combined_result = CombinedResult {
294                block_number,
295                gas_limit,
296                transaction_count,
297                new_payload_result,
298                fcu_latency,
299                total_latency,
300            };
301
302            // Exclude time spent waiting on the block prefetch channel from the benchmark duration.
303            // We want to measure engine throughput, not RPC fetch latency.
304            blocks_processed += 1;
305            let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
306            let progress = match total_blocks {
307                Some(total) => format!("{blocks_processed}/{total}"),
308                None => format!("{blocks_processed}"),
309            };
310            info!(target: "reth-bench", progress, %combined_result);
311
312            if let Some(scraper) = metrics_scraper.as_mut() &&
313                let Err(err) = scraper.scrape_after_block(block_number).await
314            {
315                warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
316            }
317
318            if let Some(w) = &mut waiter {
319                w.on_block(block_number).await?;
320            }
321
322            let gas_row =
323                TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
324            results.push((gas_row, combined_result));
325        }
326
327        // Check if the spawned task encountered an error
328        if let Ok(error) = error_receiver.try_recv() {
329            return Err(error);
330        }
331
332        // Drop waiter - we don't need to wait for final blocks to persist
333        // since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence.
334        drop(waiter);
335
336        let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
337            results.into_iter().unzip();
338
339        if let Some(ref path) = self.benchmark.output {
340            write_benchmark_results(path, &gas_output_results, &combined_results)?;
341        }
342
343        if let (Some(path), Some(scraper)) = (&self.benchmark.output, &metrics_scraper) {
344            scraper.write_csv(path)?;
345        }
346
347        let gas_output =
348            TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
349
350        info!(
351            target: "reth-bench",
352            total_gas_used = gas_output.total_gas_used,
353            total_duration = ?gas_output.total_duration,
354            execution_duration = ?gas_output.execution_duration,
355            blocks_processed = gas_output.blocks_processed,
356            wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
357            execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
358            "Benchmark complete"
359        );
360
361        Ok(())
362    }
363}