Skip to main content

reth_bench/bench/
output.rs

1//! Contains various benchmark output formats, either for logging or for
2//! serialization to / from files.
3
4use alloy_primitives::B256;
5use csv::Writer;
6use eyre::OptionExt;
7use reth_primitives_traits::constants::GIGAGAS;
8use serde::{ser::SerializeStruct, Deserialize, Serialize};
9use std::{fs, path::Path, time::Duration};
10use tracing::info;
11
12/// This is the suffix for gas output csv files.
13pub(crate) const GAS_OUTPUT_SUFFIX: &str = "total_gas.csv";
14
15/// This is the suffix for combined output csv files.
16pub(crate) const COMBINED_OUTPUT_SUFFIX: &str = "combined_latency.csv";
17
18/// This is the suffix for new payload output csv files.
19pub(crate) const NEW_PAYLOAD_OUTPUT_SUFFIX: &str = "new_payload_latency.csv";
20
21/// Serialized format for gas ramp payloads on disk.
22#[derive(Debug, Serialize, Deserialize)]
23pub(crate) struct GasRampPayloadFile {
24    /// Engine API version (1-5).
25    ///
26    /// `None` indicates that `reth_newPayload` should be used.
27    #[serde(skip_serializing_if = "Option::is_none")]
28    pub(crate) version: Option<u8>,
29    /// The block hash for FCU.
30    pub(crate) block_hash: B256,
31    /// The params to pass to newPayload.
32    pub(crate) params: serde_json::Value,
33}
34
35/// This represents the results of a single `newPayload` call in the benchmark, containing the gas
36/// used and the `newPayload` latency.
37#[derive(Debug)]
38pub(crate) struct NewPayloadResult {
39    /// The gas used in the `newPayload` call.
40    pub(crate) gas_used: u64,
41    /// The latency of the `newPayload` call.
42    pub(crate) latency: Duration,
43    /// Time spent waiting for persistence. `None` when no persistence was in-flight.
44    pub(crate) persistence_wait: Option<Duration>,
45    /// Time spent waiting for execution cache lock.
46    pub(crate) execution_cache_wait: Duration,
47    /// Time spent waiting for sparse trie lock.
48    pub(crate) sparse_trie_wait: Duration,
49}
50
51impl NewPayloadResult {
52    /// Returns the gas per second processed in the `newPayload` call.
53    pub(crate) fn gas_per_second(&self) -> f64 {
54        self.gas_used as f64 / self.latency.as_secs_f64()
55    }
56}
57
58impl std::fmt::Display for NewPayloadResult {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        write!(
61            f,
62            "New payload processed at {:.4} Ggas/s, used {} total gas. Latency: {:?}",
63            self.gas_per_second() / GIGAGAS as f64,
64            self.gas_used,
65            self.latency
66        )
67    }
68}
69
70/// This is another [`Serialize`] implementation for the [`NewPayloadResult`] struct, serializing
71/// the duration as microseconds because the csv writer would fail otherwise.
72impl Serialize for NewPayloadResult {
73    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
74    where
75        S: serde::ser::Serializer,
76    {
77        // convert the time to microseconds
78        let time = self.latency.as_micros();
79        let mut state = serializer.serialize_struct("NewPayloadResult", 5)?;
80        state.serialize_field("gas_used", &self.gas_used)?;
81        state.serialize_field("latency", &time)?;
82        state.serialize_field("persistence_wait", &self.persistence_wait.map(|d| d.as_micros()))?;
83        state.serialize_field("execution_cache_wait", &self.execution_cache_wait.as_micros())?;
84        state.serialize_field("sparse_trie_wait", &self.sparse_trie_wait.as_micros())?;
85        state.end()
86    }
87}
88
89/// This represents the combined results of a `newPayload` call and a `forkchoiceUpdated` call in
90/// the benchmark, containing the gas used, the `newPayload` latency, and the `forkchoiceUpdated`
91/// latency.
92#[derive(Debug)]
93pub(crate) struct CombinedResult {
94    /// The block number of the block being processed.
95    pub(crate) block_number: u64,
96    /// The gas limit of the block.
97    pub(crate) gas_limit: u64,
98    /// The number of transactions in the block.
99    pub(crate) transaction_count: u64,
100    /// The `newPayload` result.
101    pub(crate) new_payload_result: NewPayloadResult,
102    /// The latency of the `forkchoiceUpdated` call.
103    pub(crate) fcu_latency: Duration,
104    /// The latency of both calls combined.
105    pub(crate) total_latency: Duration,
106}
107
108impl CombinedResult {
109    /// Returns the gas per second, including the `newPayload` _and_ `forkchoiceUpdated` duration.
110    pub(crate) fn combined_gas_per_second(&self) -> f64 {
111        self.new_payload_result.gas_used as f64 / self.total_latency.as_secs_f64()
112    }
113}
114
115impl std::fmt::Display for CombinedResult {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        let np = &self.new_payload_result;
118        write!(
119            f,
120            "Block {} processed at {:.4} Ggas/s, used {} total gas. Combined: {:.4} Ggas/s. fcu: {:?}, newPayload: {:?}",
121            self.block_number,
122            np.gas_per_second() / GIGAGAS as f64,
123            np.gas_used,
124            self.combined_gas_per_second() / GIGAGAS as f64,
125            self.fcu_latency,
126            np.latency,
127        )?;
128        if !np.execution_cache_wait.is_zero() {
129            write!(f, ", execution cache wait: {:?}", np.execution_cache_wait)?;
130        }
131        if !np.sparse_trie_wait.is_zero() {
132            write!(f, ", trie cache wait: {:?}", np.sparse_trie_wait)?;
133        }
134        if let Some(d) = np.persistence_wait {
135            write!(f, ", persistence wait: {d:?}")?;
136        }
137        Ok(())
138    }
139}
140
141/// This is a [`Serialize`] implementation for the [`CombinedResult`] struct, serializing the
142/// durations as microseconds because the csv writer would fail otherwise.
143impl Serialize for CombinedResult {
144    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
145    where
146        S: serde::ser::Serializer,
147    {
148        // convert the time to microseconds
149        let fcu_latency = self.fcu_latency.as_micros();
150        let new_payload_latency = self.new_payload_result.latency.as_micros();
151        let total_latency = self.total_latency.as_micros();
152        let mut state = serializer.serialize_struct("CombinedResult", 10)?;
153
154        // flatten the new payload result because this is meant for CSV writing
155        state.serialize_field("block_number", &self.block_number)?;
156        state.serialize_field("gas_limit", &self.gas_limit)?;
157        state.serialize_field("transaction_count", &self.transaction_count)?;
158        state.serialize_field("gas_used", &self.new_payload_result.gas_used)?;
159        state.serialize_field("new_payload_latency", &new_payload_latency)?;
160        state.serialize_field("fcu_latency", &fcu_latency)?;
161        state.serialize_field("total_latency", &total_latency)?;
162        state.serialize_field(
163            "persistence_wait",
164            &self.new_payload_result.persistence_wait.map(|d| d.as_micros()),
165        )?;
166        state.serialize_field(
167            "execution_cache_wait",
168            &self.new_payload_result.execution_cache_wait.as_micros(),
169        )?;
170        state.serialize_field(
171            "sparse_trie_wait",
172            &self.new_payload_result.sparse_trie_wait.as_micros(),
173        )?;
174        state.end()
175    }
176}
177
178/// This represents a row of total gas data in the benchmark.
179#[derive(Debug)]
180pub(crate) struct TotalGasRow {
181    /// The block number of the block being processed.
182    pub(crate) block_number: u64,
183    /// The number of transactions in the block.
184    pub(crate) transaction_count: u64,
185    /// The total gas used in the block.
186    pub(crate) gas_used: u64,
187    /// Time since the start of the benchmark.
188    pub(crate) time: Duration,
189}
190
191/// This represents the aggregated output, meant to show gas per second metrics, of a benchmark run.
192#[derive(Debug)]
193pub(crate) struct TotalGasOutput {
194    /// The total gas used in the benchmark.
195    pub(crate) total_gas_used: u64,
196    /// The total wall-clock duration of the benchmark (includes wait times).
197    pub(crate) total_duration: Duration,
198    /// The total execution-only duration (excludes wait times).
199    pub(crate) execution_duration: Duration,
200    /// The number of blocks processed.
201    pub(crate) blocks_processed: u64,
202}
203
204impl TotalGasOutput {
205    /// Create a new [`TotalGasOutput`] from gas rows only.
206    ///
207    /// Use this when execution-only timing is not available (e.g., `new_payload_only`).
208    /// `execution_duration` will equal `total_duration`.
209    pub(crate) fn new(rows: Vec<TotalGasRow>) -> eyre::Result<Self> {
210        let total_duration = rows.last().map(|row| row.time).ok_or_eyre("empty results")?;
211        let blocks_processed = rows.len() as u64;
212        let total_gas_used: u64 = rows.into_iter().map(|row| row.gas_used).sum();
213
214        Ok(Self {
215            total_gas_used,
216            total_duration,
217            execution_duration: total_duration,
218            blocks_processed,
219        })
220    }
221
222    /// Create a new [`TotalGasOutput`] from gas rows and combined results.
223    ///
224    /// - `rows`: Used for total gas and wall-clock duration
225    /// - `combined_results`: Used for execution-only duration (sum of `total_latency`)
226    pub(crate) fn with_combined_results(
227        rows: Vec<TotalGasRow>,
228        combined_results: &[CombinedResult],
229    ) -> eyre::Result<Self> {
230        let total_duration = rows.last().map(|row| row.time).ok_or_eyre("empty results")?;
231        let blocks_processed = rows.len() as u64;
232        let total_gas_used: u64 = rows.into_iter().map(|row| row.gas_used).sum();
233
234        // Sum execution-only time from combined results
235        let execution_duration: Duration = combined_results.iter().map(|r| r.total_latency).sum();
236
237        Ok(Self { total_gas_used, total_duration, execution_duration, blocks_processed })
238    }
239
240    /// Return the total gigagas per second based on wall-clock time.
241    pub(crate) fn total_gigagas_per_second(&self) -> f64 {
242        self.total_gas_used as f64 / self.total_duration.as_secs_f64() / GIGAGAS as f64
243    }
244
245    /// Return the execution-only gigagas per second (excludes wait times).
246    pub(crate) fn execution_gigagas_per_second(&self) -> f64 {
247        self.total_gas_used as f64 / self.execution_duration.as_secs_f64() / GIGAGAS as f64
248    }
249}
250
251/// Write benchmark results to CSV files.
252///
253/// Writes two files to the output directory:
254/// - `combined_latency.csv`: Per-block latency results
255/// - `total_gas.csv`: Per-block gas usage over time
256pub(crate) fn write_benchmark_results(
257    output_dir: &Path,
258    gas_results: &[TotalGasRow],
259    combined_results: &[CombinedResult],
260) -> eyre::Result<()> {
261    fs::create_dir_all(output_dir)?;
262
263    let output_path = output_dir.join(COMBINED_OUTPUT_SUFFIX);
264    info!(target: "reth-bench", "Writing engine api call latency output to file: {:?}", output_path);
265    let mut writer = Writer::from_path(&output_path)?;
266    for result in combined_results {
267        writer.serialize(result)?;
268    }
269    writer.flush()?;
270
271    let output_path = output_dir.join(GAS_OUTPUT_SUFFIX);
272    info!(target: "reth-bench", "Writing total gas output to file: {:?}", output_path);
273    let mut writer = Writer::from_path(&output_path)?;
274    for row in gas_results {
275        writer.serialize(row)?;
276    }
277    writer.flush()?;
278
279    info!(target: "reth-bench", "Finished writing benchmark output files to {:?}.", output_dir);
280    Ok(())
281}
282
283/// This serializes the `time` field of the [`TotalGasRow`] to microseconds.
284///
285/// This is essentially just for the csv writer, which would have headers
286impl Serialize for TotalGasRow {
287    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
288    where
289        S: serde::ser::Serializer,
290    {
291        // convert the time to microseconds
292        let time = self.time.as_micros();
293        let mut state = serializer.serialize_struct("TotalGasRow", 4)?;
294        state.serialize_field("block_number", &self.block_number)?;
295        state.serialize_field("transaction_count", &self.transaction_count)?;
296        state.serialize_field("gas_used", &self.gas_used)?;
297        state.serialize_field("time", &time)?;
298        state.end()
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use csv::Writer;
306    use std::io::BufRead;
307
308    #[test]
309    fn test_write_total_gas_row_csv() {
310        let row = TotalGasRow {
311            block_number: 1,
312            transaction_count: 10,
313            gas_used: 1_000,
314            time: Duration::from_secs(1),
315        };
316
317        let mut writer = Writer::from_writer(vec![]);
318        writer.serialize(row).unwrap();
319        let result = writer.into_inner().unwrap();
320
321        // parse into Lines
322        let mut result = result.as_slice().lines();
323
324        // assert header
325        let expected_first_line = "block_number,transaction_count,gas_used,time";
326        let first_line = result.next().unwrap().unwrap();
327        assert_eq!(first_line, expected_first_line);
328
329        let expected_second_line = "1,10,1000,1000000";
330        let second_line = result.next().unwrap().unwrap();
331        assert_eq!(second_line, expected_second_line);
332    }
333}