1use crate::{
7 authenticated_transport::AuthenticatedTransportConnect, bench::helpers::parse_gas_limit,
8};
9use alloy_eips::{BlockNumberOrTag, Typed2718};
10use alloy_primitives::{Bytes, B256};
11use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
12use alloy_rpc_client::ClientBuilder;
13use alloy_rpc_types_engine::{
14 ExecutionPayloadEnvelopeV4, ExecutionPayloadEnvelopeV5, ForkchoiceState, JwtSecret,
15 PayloadAttributes,
16};
17use alloy_transport::layers::RetryBackoffLayer;
18use clap::Parser;
19use eyre::Context;
20use reqwest::Url;
21use reth_cli_runner::CliContext;
22use reth_rpc_api::TestingBuildBlockRequestV1;
23use std::future::Future;
24use tokio::sync::mpsc;
25use tracing::{info, warn};
26
27#[derive(Debug, Clone)]
29pub struct RawTransaction {
30 pub gas_used: u64,
32 pub tx_type: u8,
34 pub raw: Bytes,
36}
37
38pub trait TransactionSource {
42 fn fetch_block_transactions(
47 &self,
48 block_number: u64,
49 ) -> impl Future<Output = eyre::Result<Option<(Vec<RawTransaction>, u64)>>> + Send;
50}
51
52#[derive(Debug)]
54pub struct RpcTransactionSource {
55 provider: RootProvider<AnyNetwork>,
56}
57
58impl RpcTransactionSource {
59 pub const fn new(provider: RootProvider<AnyNetwork>) -> Self {
61 Self { provider }
62 }
63
64 pub fn from_url(rpc_url: &str) -> eyre::Result<Self> {
66 let client = ClientBuilder::default()
67 .layer(RetryBackoffLayer::new(10, 800, u64::MAX))
68 .http(rpc_url.parse()?);
69 let provider = RootProvider::<AnyNetwork>::new(client);
70 Ok(Self { provider })
71 }
72}
73
74impl TransactionSource for RpcTransactionSource {
75 async fn fetch_block_transactions(
76 &self,
77 block_number: u64,
78 ) -> eyre::Result<Option<(Vec<RawTransaction>, u64)>> {
79 let (block, receipts) = tokio::try_join!(
81 self.provider.get_block_by_number(block_number.into()).full(),
82 self.provider.get_block_receipts(block_number.into())
83 )?;
84
85 let Some(block) = block else {
86 return Ok(None);
87 };
88
89 let Some(receipts) = receipts else {
90 return Err(eyre::eyre!("Receipts not found for block {}", block_number));
91 };
92
93 let block_gas_used = block.header.gas_used;
94
95 let mut prev_cumulative = 0u64;
97 let transactions: Vec<RawTransaction> = block
98 .transactions
99 .txns()
100 .zip(receipts.iter())
101 .map(|(tx, receipt)| {
102 let cumulative = receipt.inner.inner.inner.receipt.cumulative_gas_used;
103 let gas_used = cumulative - prev_cumulative;
104 prev_cumulative = cumulative;
105
106 let with_encoded = tx.inner.inner.clone().into_encoded();
107 RawTransaction {
108 gas_used,
109 tx_type: tx.inner.ty(),
110 raw: with_encoded.encoded_bytes().clone(),
111 }
112 })
113 .collect();
114
115 Ok(Some((transactions, block_gas_used)))
116 }
117}
118
119#[derive(Debug)]
121pub struct TransactionCollector<S> {
122 source: S,
123 target_gas: u64,
124}
125
126impl<S: TransactionSource> TransactionCollector<S> {
127 pub const fn new(source: S, target_gas: u64) -> Self {
129 Self { source, target_gas }
130 }
131
132 pub async fn collect(&self, start_block: u64) -> eyre::Result<(Vec<Bytes>, u64, u64)> {
137 let mut transactions: Vec<Bytes> = Vec::new();
138 let mut total_gas: u64 = 0;
139 let mut current_block = start_block;
140
141 while total_gas < self.target_gas {
142 let Some((block_txs, _)) = self.source.fetch_block_transactions(current_block).await?
143 else {
144 warn!(block = current_block, "Block not found, stopping");
145 break;
146 };
147
148 for tx in block_txs {
149 if tx.tx_type == 3 {
151 continue;
152 }
153
154 if total_gas + tx.gas_used <= self.target_gas {
155 transactions.push(tx.raw);
156 total_gas += tx.gas_used;
157 }
158
159 if total_gas >= self.target_gas {
160 break;
161 }
162 }
163
164 current_block += 1;
165
166 let remaining_gas = self.target_gas.saturating_sub(total_gas);
168 if remaining_gas < 1_000_000 {
169 break;
170 }
171 }
172
173 info!(
174 total_txs = transactions.len(),
175 total_gas,
176 next_block = current_block,
177 "Finished collecting transactions"
178 );
179
180 Ok((transactions, total_gas, current_block))
181 }
182}
183
184#[derive(Debug, Parser)]
189pub struct Command {
190 #[arg(long, value_name = "RPC_URL")]
192 rpc_url: String,
193
194 #[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
196 engine_rpc_url: String,
197
198 #[arg(long, value_name = "TESTING_RPC_URL", default_value = "http://localhost:8545")]
200 testing_rpc_url: String,
201
202 #[arg(long, value_name = "JWT_SECRET")]
204 jwt_secret: std::path::PathBuf,
205
206 #[arg(long, value_name = "TARGET_GAS", default_value = "30000000", value_parser = parse_gas_limit)]
210 target_gas: u64,
211
212 #[arg(long, value_name = "FROM_BLOCK")]
226 from_block: u64,
227
228 #[arg(long, default_value = "false")]
231 execute: bool,
232
233 #[arg(long, default_value = "1")]
237 count: u64,
238
239 #[arg(long, default_value = "4")]
242 prefetch_buffer: usize,
243
244 #[arg(long, value_name = "OUTPUT_DIR")]
246 output_dir: std::path::PathBuf,
247}
248
249struct BuiltPayload {
251 block_number: u64,
252 envelope: ExecutionPayloadEnvelopeV4,
253 block_hash: B256,
254 timestamp: u64,
255}
256
257impl Command {
258 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
260 info!(target_gas = self.target_gas, count = self.count, "Generating big block(s)");
261
262 let jwt =
264 std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
265 let jwt = JwtSecret::from_hex(jwt.trim())?;
266 let auth_url = Url::parse(&self.engine_rpc_url)?;
267
268 info!("Connecting to Engine RPC at {}", auth_url);
269 let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
270 let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
271 let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
272
273 info!("Connecting to Testing RPC at {}", self.testing_rpc_url);
275 let testing_client = ClientBuilder::default()
276 .layer(RetryBackoffLayer::new(10, 800, u64::MAX))
277 .http(self.testing_rpc_url.parse()?);
278 let testing_provider = RootProvider::<AnyNetwork>::new(testing_client);
279
280 info!(endpoint = "engine", method = "eth_getBlockByNumber", block = "latest", "RPC call");
282 let parent_block = auth_provider
283 .get_block_by_number(BlockNumberOrTag::Latest)
284 .await?
285 .ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
286
287 let parent_hash = parent_block.header.hash;
288 let parent_number = parent_block.header.number;
289 let parent_timestamp = parent_block.header.timestamp;
290
291 info!(
292 parent_hash = %parent_hash,
293 parent_number = parent_number,
294 "Using initial parent block"
295 );
296
297 std::fs::create_dir_all(&self.output_dir).wrap_err_with(|| {
299 format!("Failed to create output directory: {:?}", self.output_dir)
300 })?;
301
302 let start_block = self.from_block;
303
304 if self.count > 1 {
306 self.execute_pipelined(
307 &auth_provider,
308 &testing_provider,
309 start_block,
310 parent_hash,
311 parent_timestamp,
312 )
313 .await?;
314 } else {
315 let tx_source = RpcTransactionSource::from_url(&self.rpc_url)?;
317 let collector = TransactionCollector::new(tx_source, self.target_gas);
318 let (transactions, _total_gas, _next_block) = collector.collect(start_block).await?;
319
320 if transactions.is_empty() {
321 return Err(eyre::eyre!("No transactions collected"));
322 }
323
324 self.execute_sequential(
325 &auth_provider,
326 &testing_provider,
327 transactions,
328 parent_hash,
329 parent_timestamp,
330 )
331 .await?;
332 }
333
334 info!(count = self.count, output_dir = %self.output_dir.display(), "All payloads generated");
335 Ok(())
336 }
337
338 async fn execute_sequential(
340 &self,
341 auth_provider: &RootProvider<AnyNetwork>,
342 testing_provider: &RootProvider<AnyNetwork>,
343 transactions: Vec<Bytes>,
344 mut parent_hash: B256,
345 mut parent_timestamp: u64,
346 ) -> eyre::Result<()> {
347 for i in 0..self.count {
348 info!(
349 payload = i + 1,
350 total = self.count,
351 parent_hash = %parent_hash,
352 parent_timestamp = parent_timestamp,
353 "Building payload via testing_buildBlockV1"
354 );
355
356 let built = self
357 .build_payload(testing_provider, &transactions, i, parent_hash, parent_timestamp)
358 .await?;
359
360 self.save_payload(&built)?;
361
362 if self.execute || self.count > 1 {
363 info!(payload = i + 1, block_hash = %built.block_hash, "Executing payload (newPayload + FCU)");
364 self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?;
365 info!(payload = i + 1, "Payload executed successfully");
366 }
367
368 parent_hash = built.block_hash;
369 parent_timestamp = built.timestamp;
370 }
371 Ok(())
372 }
373
374 async fn execute_pipelined(
376 &self,
377 auth_provider: &RootProvider<AnyNetwork>,
378 testing_provider: &RootProvider<AnyNetwork>,
379 start_block: u64,
380 initial_parent_hash: B256,
381 initial_parent_timestamp: u64,
382 ) -> eyre::Result<()> {
383 let (tx_sender, mut tx_receiver) = mpsc::channel::<Vec<Bytes>>(self.prefetch_buffer);
385
386 let rpc_url = self.rpc_url.clone();
388 let target_gas = self.target_gas;
389 let count = self.count;
390
391 let fetcher_handle = tokio::spawn(async move {
392 let tx_source = match RpcTransactionSource::from_url(&rpc_url) {
393 Ok(source) => source,
394 Err(e) => {
395 warn!(error = %e, "Failed to create transaction source");
396 return;
397 }
398 };
399
400 let collector = TransactionCollector::new(tx_source, target_gas);
401 let mut current_block = start_block;
402
403 for payload_idx in 0..count {
404 match collector.collect(current_block).await {
405 Ok((transactions, total_gas, next_block)) => {
406 info!(
407 payload = payload_idx + 1,
408 tx_count = transactions.len(),
409 total_gas,
410 blocks = format!("{}..{}", current_block, next_block),
411 "Fetched transactions"
412 );
413 current_block = next_block;
414
415 if tx_sender.send(transactions).await.is_err() {
416 break;
417 }
418 }
419 Err(e) => {
420 warn!(payload = payload_idx + 1, error = %e, "Failed to fetch transactions");
421 break;
422 }
423 }
424 }
425 });
426
427 let mut parent_hash = initial_parent_hash;
428 let mut parent_timestamp = initial_parent_timestamp;
429 let mut pending_build: Option<tokio::task::JoinHandle<eyre::Result<BuiltPayload>>> = None;
430
431 for i in 0..self.count {
432 let is_last = i == self.count - 1;
433
434 let current_payload = if let Some(handle) = pending_build.take() {
436 handle.await??
437 } else {
438 let transactions = tx_receiver
440 .recv()
441 .await
442 .ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
443
444 if transactions.is_empty() {
445 return Err(eyre::eyre!("No transactions collected for payload {}", i + 1));
446 }
447
448 info!(
449 payload = i + 1,
450 total = self.count,
451 parent_hash = %parent_hash,
452 parent_timestamp = parent_timestamp,
453 tx_count = transactions.len(),
454 "Building payload via testing_buildBlockV1"
455 );
456 self.build_payload(
457 testing_provider,
458 &transactions,
459 i,
460 parent_hash,
461 parent_timestamp,
462 )
463 .await?
464 };
465
466 self.save_payload(¤t_payload)?;
467
468 let current_block_hash = current_payload.block_hash;
469 let current_timestamp = current_payload.timestamp;
470
471 info!(payload = i + 1, block_hash = %current_block_hash, "Executing payload (newPayload + FCU)");
473 self.execute_payload_v4(auth_provider, current_payload.envelope, parent_hash).await?;
474 info!(payload = i + 1, "Payload executed successfully");
475
476 if !is_last {
478 let next_transactions = tx_receiver
480 .recv()
481 .await
482 .ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
483
484 if next_transactions.is_empty() {
485 return Err(eyre::eyre!("No transactions collected for payload {}", i + 2));
486 }
487
488 let testing_provider = testing_provider.clone();
489 let next_index = i + 1;
490 let total = self.count;
491
492 pending_build = Some(tokio::spawn(async move {
493 info!(
494 payload = next_index + 1,
495 total = total,
496 parent_hash = %current_block_hash,
497 parent_timestamp = current_timestamp,
498 tx_count = next_transactions.len(),
499 "Building payload via testing_buildBlockV1"
500 );
501
502 Self::build_payload_static(
503 &testing_provider,
504 &next_transactions,
505 next_index,
506 current_block_hash,
507 current_timestamp,
508 )
509 .await
510 }));
511 }
512
513 parent_hash = current_block_hash;
514 parent_timestamp = current_timestamp;
515 }
516
517 drop(tx_receiver);
519 let _ = fetcher_handle.await;
520
521 Ok(())
522 }
523
524 async fn build_payload(
526 &self,
527 testing_provider: &RootProvider<AnyNetwork>,
528 transactions: &[Bytes],
529 index: u64,
530 parent_hash: B256,
531 parent_timestamp: u64,
532 ) -> eyre::Result<BuiltPayload> {
533 Self::build_payload_static(
534 testing_provider,
535 transactions,
536 index,
537 parent_hash,
538 parent_timestamp,
539 )
540 .await
541 }
542
543 async fn build_payload_static(
545 testing_provider: &RootProvider<AnyNetwork>,
546 transactions: &[Bytes],
547 index: u64,
548 parent_hash: B256,
549 parent_timestamp: u64,
550 ) -> eyre::Result<BuiltPayload> {
551 let request = TestingBuildBlockRequestV1 {
552 parent_block_hash: parent_hash,
553 payload_attributes: PayloadAttributes {
554 timestamp: parent_timestamp + 12,
555 prev_randao: B256::ZERO,
556 suggested_fee_recipient: alloy_primitives::Address::ZERO,
557 withdrawals: Some(vec![]),
558 parent_beacon_block_root: Some(B256::ZERO),
559 },
560 transactions: transactions.to_vec(),
561 extra_data: None,
562 };
563
564 let total_tx_bytes: usize = transactions.iter().map(|tx| tx.len()).sum();
565 info!(
566 payload = index + 1,
567 tx_count = transactions.len(),
568 total_tx_bytes = total_tx_bytes,
569 parent_hash = %parent_hash,
570 "Sending to testing_buildBlockV1"
571 );
572 let envelope: ExecutionPayloadEnvelopeV5 =
573 testing_provider.client().request("testing_buildBlockV1", [request]).await?;
574
575 let v4_envelope = envelope.try_into_v4()?;
576
577 let inner = &v4_envelope.envelope_inner.execution_payload.payload_inner.payload_inner;
578 let block_hash = inner.block_hash;
579 let block_number = inner.block_number;
580 let timestamp = inner.timestamp;
581
582 Ok(BuiltPayload { block_number, envelope: v4_envelope, block_hash, timestamp })
583 }
584
585 fn save_payload(&self, payload: &BuiltPayload) -> eyre::Result<()> {
587 let filename = format!("payload_block_{}.json", payload.block_number);
588 let filepath = self.output_dir.join(&filename);
589 let json = serde_json::to_string_pretty(&payload.envelope)?;
590 std::fs::write(&filepath, &json)
591 .wrap_err_with(|| format!("Failed to write payload to {:?}", filepath))?;
592 info!(block_number = payload.block_number, block_hash = %payload.block_hash, path = %filepath.display(), "Payload saved");
593 Ok(())
594 }
595
596 async fn execute_payload_v4(
597 &self,
598 provider: &RootProvider<AnyNetwork>,
599 envelope: ExecutionPayloadEnvelopeV4,
600 parent_hash: B256,
601 ) -> eyre::Result<()> {
602 let block_hash =
603 envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
604
605 let status = provider
606 .new_payload_v4(
607 envelope.envelope_inner.execution_payload,
608 vec![],
609 B256::ZERO,
610 envelope.execution_requests.to_vec(),
611 )
612 .await?;
613
614 if !status.is_valid() {
615 return Err(eyre::eyre!("Payload rejected: {:?}", status));
616 }
617
618 let fcu_state = ForkchoiceState {
619 head_block_hash: block_hash,
620 safe_block_hash: parent_hash,
621 finalized_block_hash: parent_hash,
622 };
623
624 let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
625
626 if !fcu_result.is_valid() {
627 return Err(eyre::eyre!("FCU rejected: {:?}", fcu_result));
628 }
629
630 Ok(())
631 }
632}