1use crate::{
13 bench::{
14 context::BenchContext,
15 output::{
16 write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
17 },
18 },
19 valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
20};
21use alloy_eips::BlockNumHash;
22use alloy_network::Ethereum;
23use alloy_provider::{Provider, RootProvider};
24use alloy_pubsub::SubscriptionStream;
25use alloy_rpc_client::RpcClient;
26use alloy_rpc_types_engine::ForkchoiceState;
27use alloy_transport_ws::WsConnect;
28use clap::Parser;
29use eyre::{Context, OptionExt};
30use futures::StreamExt;
31use humantime::parse_duration;
32use reth_cli_runner::CliContext;
33use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
34use reth_node_core::args::BenchmarkArgs;
35use std::time::{Duration, Instant};
36use tracing::{debug, info};
37use url::Url;
38
39const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(60);
40
41#[derive(Debug, Parser)]
43pub struct Command {
44 #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
46 rpc_url: String,
47
48 #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
50 wait_time: Option<Duration>,
51
52 #[arg(long, default_value = "false", verbatim_doc_comment)]
60 wait_for_persistence: bool,
61
62 #[arg(
68 long = "persistence-threshold",
69 value_name = "PERSISTENCE_THRESHOLD",
70 default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
71 verbatim_doc_comment
72 )]
73 persistence_threshold: u64,
74
75 #[arg(
78 long = "rpc-block-buffer-size",
79 value_name = "RPC_BLOCK_BUFFER_SIZE",
80 default_value = "20",
81 verbatim_doc_comment
82 )]
83 rpc_block_buffer_size: usize,
84
85 #[command(flatten)]
86 benchmark: BenchmarkArgs,
87}
88
89impl Command {
90 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
92 if let Some(duration) = self.wait_time {
94 info!("Using wait-time mode with {}ms delay between blocks", duration.as_millis());
95 }
96 if self.wait_for_persistence {
97 info!(
98 "Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
99 self.persistence_threshold + 1,
100 self.persistence_threshold
101 );
102 }
103
104 let mut waiter = match (self.wait_time, self.wait_for_persistence) {
106 (Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
107 (None, true) => {
108 let sub = self.setup_persistence_subscription().await?;
109 Some(PersistenceWaiter::with_subscription(
110 sub,
111 self.persistence_threshold,
112 PERSISTENCE_CHECKPOINT_TIMEOUT,
113 ))
114 }
115 (None, false) => None,
116 };
117
118 let BenchContext {
119 benchmark_mode,
120 block_provider,
121 auth_provider,
122 mut next_block,
123 is_optimism,
124 ..
125 } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
126
127 let buffer_size = self.rpc_block_buffer_size;
128
129 let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
131 let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
132
133 tokio::task::spawn(async move {
134 while benchmark_mode.contains(next_block) {
135 let block_res = block_provider
136 .get_block_by_number(next_block.into())
137 .full()
138 .await
139 .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
140 let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
141 Ok(block) => block,
142 Err(e) => {
143 tracing::error!("Failed to fetch block {next_block}: {e}");
144 let _ = error_sender.send(e);
145 break;
146 }
147 };
148
149 let head_block_hash = block.header.hash;
150 let safe_block_hash = block_provider
151 .get_block_by_number(block.header.number.saturating_sub(32).into());
152
153 let finalized_block_hash = block_provider
154 .get_block_by_number(block.header.number.saturating_sub(64).into());
155
156 let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
157
158 let safe_block_hash = match safe {
159 Ok(Some(block)) => block.header.hash,
160 Ok(None) | Err(_) => head_block_hash,
161 };
162
163 let finalized_block_hash = match finalized {
164 Ok(Some(block)) => block.header.hash,
165 Ok(None) | Err(_) => head_block_hash,
166 };
167
168 next_block += 1;
169 if let Err(e) = sender
170 .send((block, head_block_hash, safe_block_hash, finalized_block_hash))
171 .await
172 {
173 tracing::error!("Failed to send block data: {e}");
174 break;
175 }
176 }
177 });
178
179 let mut results = Vec::new();
180 let total_benchmark_duration = Instant::now();
181 let mut total_wait_time = Duration::ZERO;
182
183 while let Some((block, head, safe, finalized)) = {
184 let wait_start = Instant::now();
185 let result = receiver.recv().await;
186 total_wait_time += wait_start.elapsed();
187 result
188 } {
189 let gas_used = block.header.gas_used;
190 let gas_limit = block.header.gas_limit;
191 let block_number = block.header.number;
192 let transaction_count = block.transactions.len() as u64;
193
194 debug!(target: "reth-bench", ?block_number, "Sending payload");
195
196 let forkchoice_state = ForkchoiceState {
197 head_block_hash: head,
198 safe_block_hash: safe,
199 finalized_block_hash: finalized,
200 };
201
202 let (version, params) = block_to_new_payload(block, is_optimism)?;
203 let start = Instant::now();
204 call_new_payload(&auth_provider, version, params).await?;
205
206 let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
207
208 call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?;
209
210 let total_latency = start.elapsed();
211 let fcu_latency = total_latency - new_payload_result.latency;
212 let combined_result = CombinedResult {
213 block_number,
214 gas_limit,
215 transaction_count,
216 new_payload_result,
217 fcu_latency,
218 total_latency,
219 };
220
221 let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
224 info!(%combined_result);
225
226 if let Some(w) = &mut waiter {
227 w.on_block(block_number).await?;
228 }
229
230 let gas_row =
231 TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
232 results.push((gas_row, combined_result));
233 }
234
235 if let Ok(error) = error_receiver.try_recv() {
237 return Err(error);
238 }
239
240 drop(waiter);
243
244 let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
245 results.into_iter().unzip();
246
247 if let Some(ref path) = self.benchmark.output {
248 write_benchmark_results(path, &gas_output_results, combined_results)?;
249 }
250
251 let gas_output = TotalGasOutput::new(gas_output_results)?;
252
253 info!(
254 total_duration=?gas_output.total_duration,
255 total_gas_used=?gas_output.total_gas_used,
256 blocks_processed=?gas_output.blocks_processed,
257 "Total Ggas/s: {:.4}",
258 gas_output.total_gigagas_per_second()
259 );
260
261 Ok(())
262 }
263
264 fn derive_ws_rpc_url(&self) -> eyre::Result<Url> {
275 if let Some(ref ws_url) = self.benchmark.ws_rpc_url {
276 let parsed: Url = ws_url
277 .parse()
278 .wrap_err_with(|| format!("Failed to parse WebSocket RPC URL: {ws_url}"))?;
279 info!(target: "reth-bench", ws_url = %parsed, "Using provided WebSocket RPC URL");
280 Ok(parsed)
281 } else {
282 let derived = engine_url_to_ws_url(&self.benchmark.engine_rpc_url)?;
283 debug!(
284 target: "reth-bench",
285 engine_url = %self.benchmark.engine_rpc_url,
286 %derived,
287 "Derived WebSocket RPC URL from engine RPC URL"
288 );
289 Ok(derived)
290 }
291 }
292
293 async fn setup_persistence_subscription(&self) -> eyre::Result<PersistenceSubscription> {
295 let ws_url = self.derive_ws_rpc_url()?;
296
297 info!("Connecting to WebSocket at {} for persistence subscription", ws_url);
298
299 let ws_connect = WsConnect::new(ws_url.to_string());
300 let client = RpcClient::connect_pubsub(ws_connect)
301 .await
302 .wrap_err("Failed to connect to WebSocket RPC endpoint")?;
303 let provider: RootProvider<Ethereum> = RootProvider::new(client);
304
305 let subscription = provider
306 .subscribe_to::<BlockNumHash>("reth_subscribePersistedBlock")
307 .await
308 .wrap_err("Failed to subscribe to persistence notifications")?;
309
310 info!("Subscribed to persistence notifications");
311
312 Ok(PersistenceSubscription::new(provider, subscription.into_stream()))
313 }
314}
315
316fn engine_url_to_ws_url(engine_url: &str) -> eyre::Result<Url> {
327 let url: Url = engine_url
328 .parse()
329 .wrap_err_with(|| format!("Failed to parse engine RPC URL: {engine_url}"))?;
330
331 let mut ws_url = url.clone();
332
333 match ws_url.scheme() {
334 "http" => ws_url
335 .set_scheme("ws")
336 .map_err(|_| eyre::eyre!("Failed to set WS scheme for URL: {url}"))?,
337 "https" => ws_url
338 .set_scheme("wss")
339 .map_err(|_| eyre::eyre!("Failed to set WSS scheme for URL: {url}"))?,
340 "ws" | "wss" => {}
341 scheme => {
342 return Err(eyre::eyre!(
343 "Unsupported URL scheme '{scheme}' for URL: {url}. Expected http, https, ws, or wss."
344 ))
345 }
346 }
347
348 ws_url.set_port(Some(8546)).map_err(|_| eyre::eyre!("Failed to set port for URL: {url}"))?;
349
350 Ok(ws_url)
351}
352
353async fn wait_for_persistence(
359 stream: &mut SubscriptionStream<BlockNumHash>,
360 target: u64,
361 last_persisted: &mut u64,
362 timeout: Duration,
363) -> eyre::Result<()> {
364 tokio::time::timeout(timeout, async {
365 while *last_persisted < target {
366 match stream.next().await {
367 Some(persisted) => {
368 *last_persisted = persisted.number;
369 debug!(
370 target: "reth-bench",
371 persisted_block = ?last_persisted,
372 "Received persistence notification"
373 );
374 }
375 None => {
376 return Err(eyre::eyre!("Persistence subscription closed unexpectedly"));
377 }
378 }
379 }
380 Ok(())
381 })
382 .await
383 .map_err(|_| {
384 eyre::eyre!(
385 "Persistence timeout: target block {} not persisted within {:?}. Last persisted: {}",
386 target,
387 timeout,
388 last_persisted
389 )
390 })?
391}
392
393struct PersistenceSubscription {
396 _provider: RootProvider<Ethereum>,
397 stream: SubscriptionStream<BlockNumHash>,
398}
399
400impl PersistenceSubscription {
401 const fn new(
402 provider: RootProvider<Ethereum>,
403 stream: SubscriptionStream<BlockNumHash>,
404 ) -> Self {
405 Self { _provider: provider, stream }
406 }
407
408 const fn stream_mut(&mut self) -> &mut SubscriptionStream<BlockNumHash> {
409 &mut self.stream
410 }
411}
412
413struct PersistenceWaiter {
421 wait_time: Option<Duration>,
422 subscription: Option<PersistenceSubscription>,
423 blocks_sent: u64,
424 last_persisted: u64,
425 threshold: u64,
426 timeout: Duration,
427}
428
429impl PersistenceWaiter {
430 const fn with_duration(wait_time: Duration) -> Self {
431 Self {
432 wait_time: Some(wait_time),
433 subscription: None,
434 blocks_sent: 0,
435 last_persisted: 0,
436 threshold: 0,
437 timeout: Duration::ZERO,
438 }
439 }
440
441 const fn with_subscription(
442 subscription: PersistenceSubscription,
443 threshold: u64,
444 timeout: Duration,
445 ) -> Self {
446 Self {
447 wait_time: None,
448 subscription: Some(subscription),
449 blocks_sent: 0,
450 last_persisted: 0,
451 threshold,
452 timeout,
453 }
454 }
455
456 #[allow(clippy::manual_is_multiple_of)]
458 async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> {
459 if let Some(wait_time) = self.wait_time {
460 tokio::time::sleep(wait_time).await;
461 return Ok(());
462 }
463
464 let Some(ref mut subscription) = self.subscription else {
465 return Ok(());
466 };
467
468 self.blocks_sent += 1;
469
470 if self.blocks_sent % (self.threshold + 1) == 0 {
471 debug!(
472 target: "reth-bench",
473 target_block = ?block_number,
474 last_persisted = self.last_persisted,
475 blocks_sent = self.blocks_sent,
476 "Waiting for persistence"
477 );
478
479 wait_for_persistence(
480 subscription.stream_mut(),
481 block_number,
482 &mut self.last_persisted,
483 self.timeout,
484 )
485 .await?;
486
487 debug!(
488 target: "reth-bench",
489 persisted = self.last_persisted,
490 "Persistence caught up"
491 );
492 }
493
494 Ok(())
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501
502 #[test]
503 fn test_engine_url_to_ws_url() {
504 let result = engine_url_to_ws_url("http://localhost:8551").unwrap();
506 assert_eq!(result.as_str(), "ws://localhost:8546/");
507
508 let result = engine_url_to_ws_url("https://localhost:8551").unwrap();
510 assert_eq!(result.as_str(), "wss://localhost:8546/");
511
512 let result = engine_url_to_ws_url("http://localhost:9551").unwrap();
514 assert_eq!(result.port(), Some(8546));
515
516 let result = engine_url_to_ws_url("ws://localhost:8546").unwrap();
518 assert_eq!(result.scheme(), "ws");
519
520 assert!(engine_url_to_ws_url("ftp://localhost:8551").is_err());
522 assert!(engine_url_to_ws_url("not a valid url").is_err());
523 }
524
525 #[tokio::test]
526 async fn test_waiter_with_duration() {
527 let mut waiter = PersistenceWaiter::with_duration(Duration::from_millis(1));
528
529 let start = Instant::now();
530 waiter.on_block(1).await.unwrap();
531 waiter.on_block(2).await.unwrap();
532 waiter.on_block(3).await.unwrap();
533
534 assert!(start.elapsed() >= Duration::from_millis(3));
536 }
537}