1use crate::{
7 authenticated_transport::AuthenticatedTransportConnect, bench::helpers::parse_gas_limit,
8};
9use alloy_eips::{BlockNumberOrTag, Typed2718};
10use alloy_primitives::{Bytes, B256};
11use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
12use alloy_rpc_client::ClientBuilder;
13use alloy_rpc_types_engine::{
14 ExecutionPayloadEnvelopeV4, ExecutionPayloadEnvelopeV5, ForkchoiceState, JwtSecret,
15 PayloadAttributes,
16};
17use alloy_transport::layers::RetryBackoffLayer;
18use clap::Parser;
19use eyre::Context;
20use reqwest::Url;
21use reth_cli_runner::CliContext;
22use reth_rpc_api::TestingBuildBlockRequestV1;
23use std::future::Future;
24use tokio::sync::mpsc;
25use tracing::{info, warn};
26
27#[derive(Debug, Clone)]
29pub struct RawTransaction {
30 pub gas_used: u64,
32 pub tx_type: u8,
34 pub raw: Bytes,
36}
37
38pub trait TransactionSource {
42 fn fetch_block_transactions(
47 &self,
48 block_number: u64,
49 ) -> impl Future<Output = eyre::Result<Option<(Vec<RawTransaction>, u64)>>> + Send;
50}
51
52#[derive(Debug)]
54pub struct RpcTransactionSource {
55 provider: RootProvider<AnyNetwork>,
56}
57
58impl RpcTransactionSource {
59 pub const fn new(provider: RootProvider<AnyNetwork>) -> Self {
61 Self { provider }
62 }
63
64 pub fn from_url(rpc_url: &str) -> eyre::Result<Self> {
66 let client = ClientBuilder::default()
67 .layer(RetryBackoffLayer::new(10, 800, u64::MAX))
68 .http(rpc_url.parse()?);
69 let provider = RootProvider::<AnyNetwork>::new(client);
70 Ok(Self { provider })
71 }
72}
73
74impl TransactionSource for RpcTransactionSource {
75 async fn fetch_block_transactions(
76 &self,
77 block_number: u64,
78 ) -> eyre::Result<Option<(Vec<RawTransaction>, u64)>> {
79 let (block, receipts) = tokio::try_join!(
81 self.provider.get_block_by_number(block_number.into()).full(),
82 self.provider.get_block_receipts(block_number.into())
83 )?;
84
85 let Some(block) = block else {
86 return Ok(None);
87 };
88
89 let Some(receipts) = receipts else {
90 return Err(eyre::eyre!("Receipts not found for block {}", block_number));
91 };
92
93 let block_gas_used = block.header.gas_used;
94
95 let mut prev_cumulative = 0u64;
97 let transactions: Vec<RawTransaction> = block
98 .transactions
99 .txns()
100 .zip(receipts.iter())
101 .map(|(tx, receipt)| {
102 let cumulative = receipt.inner.inner.inner.receipt.cumulative_gas_used;
103 let gas_used = cumulative - prev_cumulative;
104 prev_cumulative = cumulative;
105
106 let with_encoded = tx.inner.inner.clone().into_encoded();
107 RawTransaction {
108 gas_used,
109 tx_type: tx.inner.ty(),
110 raw: with_encoded.encoded_bytes().clone(),
111 }
112 })
113 .collect();
114
115 Ok(Some((transactions, block_gas_used)))
116 }
117}
118
119#[derive(Debug)]
121pub struct TransactionCollector<S> {
122 source: S,
123 target_gas: u64,
124}
125
126impl<S: TransactionSource> TransactionCollector<S> {
127 pub const fn new(source: S, target_gas: u64) -> Self {
129 Self { source, target_gas }
130 }
131
132 pub async fn collect(&self, start_block: u64) -> eyre::Result<CollectionResult> {
137 self.collect_gas(start_block, self.target_gas).await
138 }
139
140 pub async fn collect_gas(
144 &self,
145 start_block: u64,
146 gas_target: u64,
147 ) -> eyre::Result<CollectionResult> {
148 let mut transactions: Vec<RawTransaction> = Vec::new();
149 let mut total_gas: u64 = 0;
150 let mut current_block = start_block;
151
152 while total_gas < gas_target {
153 let Some((block_txs, _)) = self.source.fetch_block_transactions(current_block).await?
154 else {
155 warn!(target: "reth-bench", block = current_block, "Block not found, stopping");
156 break;
157 };
158
159 for tx in block_txs {
160 if tx.tx_type == 3 {
162 continue;
163 }
164
165 if total_gas + tx.gas_used <= gas_target {
166 total_gas += tx.gas_used;
167 transactions.push(tx);
168 }
169
170 if total_gas >= gas_target {
171 break;
172 }
173 }
174
175 current_block += 1;
176
177 let remaining_gas = gas_target.saturating_sub(total_gas);
179 if remaining_gas < 1_000_000 {
180 break;
181 }
182 }
183
184 info!(
185 target: "reth-bench",
186 total_txs = transactions.len(),
187 gas_sent = total_gas,
188 next_block = current_block,
189 "Finished collecting transactions"
190 );
191
192 Ok(CollectionResult { transactions, gas_sent: total_gas, next_block: current_block })
193 }
194}
195
196#[derive(Debug, Parser)]
201pub struct Command {
202 #[arg(long, value_name = "RPC_URL")]
204 rpc_url: String,
205
206 #[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
208 engine_rpc_url: String,
209
210 #[arg(long, value_name = "TESTING_RPC_URL", default_value = "http://localhost:8545")]
212 testing_rpc_url: String,
213
214 #[arg(long, value_name = "JWT_SECRET")]
216 jwt_secret: std::path::PathBuf,
217
218 #[arg(long, value_name = "TARGET_GAS", default_value = "30000000", value_parser = parse_gas_limit)]
222 target_gas: u64,
223
224 #[arg(long, value_name = "FROM_BLOCK")]
238 from_block: u64,
239
240 #[arg(long, default_value = "false")]
243 execute: bool,
244
245 #[arg(long, default_value = "1")]
249 count: u64,
250
251 #[arg(long, default_value = "4")]
254 prefetch_buffer: usize,
255
256 #[arg(long, value_name = "OUTPUT_DIR")]
258 output_dir: std::path::PathBuf,
259}
260
261struct BuiltPayload {
263 block_number: u64,
264 envelope: ExecutionPayloadEnvelopeV4,
265 block_hash: B256,
266 timestamp: u64,
267 gas_used: u64,
269}
270
271#[derive(Debug)]
273pub struct CollectionResult {
274 pub transactions: Vec<RawTransaction>,
276 pub gas_sent: u64,
278 pub next_block: u64,
280}
281
282const MAX_BUILD_RETRIES: u32 = 5;
284const MAX_FETCH_RETRIES: u32 = 5;
286const MIN_TARGET_SLACK: u64 = 1_000_000;
288const MAX_ADDITIONAL_GAS_MULTIPLIER: u64 = 10;
290
291async fn fetch_batch_with_retry<S: TransactionSource>(
295 collector: &TransactionCollector<S>,
296 block: u64,
297) -> Option<CollectionResult> {
298 for attempt in 1..=MAX_FETCH_RETRIES {
299 match collector.collect(block).await {
300 Ok(result) => return Some(result),
301 Err(e) => {
302 if attempt == MAX_FETCH_RETRIES {
303 warn!(target: "reth-bench", attempt, error = %e, "Failed to fetch transactions after max retries");
304 return None;
305 }
306 warn!(target: "reth-bench", attempt, error = %e, "Failed to fetch transactions, retrying...");
307 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
308 }
309 }
310 }
311 None
312}
313
314enum RetryOutcome {
316 Success,
318 MaxRetries,
320 NeedMore(u64),
322}
323
324struct TxBuffer {
329 receiver: mpsc::Receiver<CollectionResult>,
330}
331
332impl TxBuffer {
333 const fn new(receiver: mpsc::Receiver<CollectionResult>) -> Self {
334 Self { receiver }
335 }
336
337 async fn take_batch(&mut self) -> Option<CollectionResult> {
339 self.receiver.recv().await
340 }
341}
342
343impl Command {
344 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
346 info!(target: "reth-bench", target_gas = self.target_gas, count = self.count, "Generating big block(s)");
347
348 let jwt =
350 std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
351 let jwt = JwtSecret::from_hex(jwt.trim())?;
352 let auth_url = Url::parse(&self.engine_rpc_url)?;
353
354 info!(target: "reth-bench", "Connecting to Engine RPC at {}", auth_url);
355 let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
356 let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
357 let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
358
359 info!(target: "reth-bench", "Connecting to Testing RPC at {}", self.testing_rpc_url);
361 let testing_client = ClientBuilder::default()
362 .layer(RetryBackoffLayer::new(10, 800, u64::MAX))
363 .http(self.testing_rpc_url.parse()?);
364 let testing_provider = RootProvider::<AnyNetwork>::new(testing_client);
365
366 info!(target: "reth-bench", endpoint = "engine", method = "eth_getBlockByNumber", block = "latest", "RPC call");
368 let parent_block = auth_provider
369 .get_block_by_number(BlockNumberOrTag::Latest)
370 .await?
371 .ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
372
373 let parent_hash = parent_block.header.hash;
374 let parent_number = parent_block.header.number;
375 let parent_timestamp = parent_block.header.timestamp;
376
377 info!(
378 target: "reth-bench",
379 parent_hash = %parent_hash,
380 parent_number = parent_number,
381 "Using initial parent block"
382 );
383
384 std::fs::create_dir_all(&self.output_dir).wrap_err_with(|| {
386 format!("Failed to create output directory: {:?}", self.output_dir)
387 })?;
388
389 let start_block = self.from_block;
390
391 if self.count > 1 {
393 self.execute_pipelined(
394 &auth_provider,
395 &testing_provider,
396 start_block,
397 parent_hash,
398 parent_timestamp,
399 )
400 .await?;
401 } else {
402 let tx_source = RpcTransactionSource::from_url(&self.rpc_url)?;
404 let collector = TransactionCollector::new(tx_source, self.target_gas);
405 let result = collector.collect(start_block).await?;
406
407 if result.transactions.is_empty() {
408 return Err(eyre::eyre!("No transactions collected"));
409 }
410
411 self.execute_sequential_with_retry(
412 &auth_provider,
413 &testing_provider,
414 &collector,
415 result,
416 parent_hash,
417 parent_timestamp,
418 )
419 .await?;
420 }
421
422 info!(target: "reth-bench", count = self.count, output_dir = %self.output_dir.display(), "All payloads generated");
423 Ok(())
424 }
425
426 async fn execute_sequential_with_retry<S: TransactionSource>(
428 &self,
429 auth_provider: &RootProvider<AnyNetwork>,
430 testing_provider: &RootProvider<AnyNetwork>,
431 collector: &TransactionCollector<S>,
432 initial_result: CollectionResult,
433 mut parent_hash: B256,
434 mut parent_timestamp: u64,
435 ) -> eyre::Result<()> {
436 let mut current_result = initial_result;
437
438 for i in 0..self.count {
439 let built = self
440 .build_with_retry(
441 testing_provider,
442 collector,
443 &mut current_result,
444 i,
445 parent_hash,
446 parent_timestamp,
447 )
448 .await?;
449
450 self.save_payload(&built)?;
451
452 if self.execute || self.count > 1 {
453 info!(target: "reth-bench", payload = i + 1, block_hash = %built.block_hash, gas_used = built.gas_used, "Executing payload (newPayload + FCU)");
454 self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?;
455 info!(target: "reth-bench", payload = i + 1, "Payload executed successfully");
456 }
457
458 parent_hash = built.block_hash;
459 parent_timestamp = built.timestamp;
460 }
461 Ok(())
462 }
463
464 async fn build_with_retry<S: TransactionSource>(
469 &self,
470 testing_provider: &RootProvider<AnyNetwork>,
471 collector: &TransactionCollector<S>,
472 result: &mut CollectionResult,
473 index: u64,
474 parent_hash: B256,
475 parent_timestamp: u64,
476 ) -> eyre::Result<BuiltPayload> {
477 for attempt in 1..=MAX_BUILD_RETRIES {
478 let tx_bytes: Vec<Bytes> = result.transactions.iter().map(|t| t.raw.clone()).collect();
479 let gas_sent = result.gas_sent;
480
481 info!(
482 target: "reth-bench",
483 payload = index + 1,
484 attempt,
485 tx_count = tx_bytes.len(),
486 gas_sent,
487 parent_hash = %parent_hash,
488 "Building payload via testing_buildBlockV1"
489 );
490
491 let built = Self::build_payload_static(
492 testing_provider,
493 &tx_bytes,
494 index,
495 parent_hash,
496 parent_timestamp,
497 )
498 .await?;
499
500 match self.check_retry_outcome(&built, index, attempt, gas_sent) {
501 RetryOutcome::Success | RetryOutcome::MaxRetries => return Ok(built),
502 RetryOutcome::NeedMore(additional_gas) => {
503 let additional =
504 collector.collect_gas(result.next_block, additional_gas).await?;
505 result.transactions.extend(additional.transactions);
506 result.gas_sent = result.gas_sent.saturating_add(additional.gas_sent);
507 result.next_block = additional.next_block;
508 }
509 }
510 }
511
512 warn!(target: "reth-bench", payload = index + 1, "Retry loop exited without returning a payload");
513 Err(eyre::eyre!("build_with_retry exhausted retries without result"))
514 }
515
516 async fn execute_pipelined(
522 &self,
523 auth_provider: &RootProvider<AnyNetwork>,
524 testing_provider: &RootProvider<AnyNetwork>,
525 start_block: u64,
526 initial_parent_hash: B256,
527 initial_parent_timestamp: u64,
528 ) -> eyre::Result<()> {
529 let (tx_sender, tx_receiver) = mpsc::channel::<CollectionResult>(self.prefetch_buffer);
531
532 let rpc_url = self.rpc_url.clone();
534 let target_gas = self.target_gas;
535
536 let fetcher_handle = tokio::spawn(async move {
537 let tx_source = match RpcTransactionSource::from_url(&rpc_url) {
538 Ok(source) => source,
539 Err(e) => {
540 warn!(target: "reth-bench", error = %e, "Failed to create transaction source");
541 return None;
542 }
543 };
544
545 let collector = TransactionCollector::new(tx_source, target_gas);
546 let mut current_block = start_block;
547
548 while let Some(batch) = fetch_batch_with_retry(&collector, current_block).await {
549 if batch.transactions.is_empty() {
550 info!(target: "reth-bench", block = current_block, "Reached chain tip, stopping fetcher");
551 break;
552 }
553
554 info!(
555 target: "reth-bench",
556 tx_count = batch.transactions.len(),
557 gas_sent = batch.gas_sent,
558 blocks = format!("{}..{}", current_block, batch.next_block),
559 "Fetched transaction batch"
560 );
561 current_block = batch.next_block;
562
563 if tx_sender.send(batch).await.is_err() {
564 break;
565 }
566 }
567
568 Some(current_block)
569 });
570
571 let mut tx_buffer = TxBuffer::new(tx_receiver);
573
574 let mut parent_hash = initial_parent_hash;
575 let mut parent_timestamp = initial_parent_timestamp;
576
577 for i in 0..self.count {
578 let Some(mut result) = tx_buffer.take_batch().await else {
580 info!(
581 target: "reth-bench",
582 payloads_built = i,
583 payloads_requested = self.count,
584 "Transaction source exhausted, stopping"
585 );
586 break;
587 };
588
589 if result.transactions.is_empty() {
590 info!(
591 target: "reth-bench",
592 payloads_built = i,
593 payloads_requested = self.count,
594 "No more transactions available, stopping"
595 );
596 break;
597 }
598
599 let built = self
601 .build_with_retry_buffered(
602 testing_provider,
603 &mut tx_buffer,
604 &mut result,
605 i,
606 parent_hash,
607 parent_timestamp,
608 )
609 .await?;
610
611 self.save_payload(&built)?;
612
613 let current_block_hash = built.block_hash;
614 let current_timestamp = built.timestamp;
615
616 info!(target: "reth-bench", payload = i + 1, block_hash = %current_block_hash, gas_used = built.gas_used, "Executing payload (newPayload + FCU)");
618 self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?;
619 info!(target: "reth-bench", payload = i + 1, "Payload executed successfully");
620
621 parent_hash = current_block_hash;
622 parent_timestamp = current_timestamp;
623 }
624
625 drop(tx_buffer);
627 let _ = fetcher_handle.await;
628
629 Ok(())
630 }
631
632 async fn build_with_retry_buffered(
634 &self,
635 testing_provider: &RootProvider<AnyNetwork>,
636 tx_buffer: &mut TxBuffer,
637 result: &mut CollectionResult,
638 index: u64,
639 parent_hash: B256,
640 parent_timestamp: u64,
641 ) -> eyre::Result<BuiltPayload> {
642 for attempt in 1..=MAX_BUILD_RETRIES {
643 let tx_bytes: Vec<Bytes> = result.transactions.iter().map(|t| t.raw.clone()).collect();
644 let gas_sent = result.gas_sent;
645
646 info!(
647 target: "reth-bench",
648 payload = index + 1,
649 attempt,
650 tx_count = tx_bytes.len(),
651 gas_sent,
652 parent_hash = %parent_hash,
653 "Building payload via testing_buildBlockV1"
654 );
655
656 let built = Self::build_payload_static(
657 testing_provider,
658 &tx_bytes,
659 index,
660 parent_hash,
661 parent_timestamp,
662 )
663 .await?;
664
665 match self.check_retry_outcome(&built, index, attempt, gas_sent) {
666 RetryOutcome::Success | RetryOutcome::MaxRetries => return Ok(built),
667 RetryOutcome::NeedMore(additional_gas) => {
668 let mut collected_gas = 0u64;
669 while collected_gas < additional_gas {
670 if let Some(batch) = tx_buffer.take_batch().await {
671 collected_gas += batch.gas_sent;
672 result.transactions.extend(batch.transactions);
673 result.gas_sent = result.gas_sent.saturating_add(batch.gas_sent);
674 result.next_block = batch.next_block;
675 } else {
676 warn!(target: "reth-bench", "Transaction fetcher exhausted, proceeding with available transactions");
677 break;
678 }
679 }
680 }
681 }
682 }
683
684 warn!(target: "reth-bench", payload = index + 1, "Retry loop exited without returning a payload");
685 Err(eyre::eyre!("build_with_retry_buffered exhausted retries without result"))
686 }
687
688 fn check_retry_outcome(
690 &self,
691 built: &BuiltPayload,
692 index: u64,
693 attempt: u32,
694 gas_sent: u64,
695 ) -> RetryOutcome {
696 let gas_used = built.gas_used;
697
698 if gas_used + MIN_TARGET_SLACK >= self.target_gas {
699 info!(
700 target: "reth-bench",
701 payload = index + 1,
702 gas_used,
703 target_gas = self.target_gas,
704 attempts = attempt,
705 "Payload built successfully"
706 );
707 return RetryOutcome::Success;
708 }
709
710 if attempt == MAX_BUILD_RETRIES {
711 warn!(
712 target: "reth-bench",
713 payload = index + 1,
714 gas_used,
715 target_gas = self.target_gas,
716 gas_sent,
717 "Underfilled after max retries, accepting payload"
718 );
719 return RetryOutcome::MaxRetries;
720 }
721
722 if gas_used == 0 {
723 warn!(
724 target: "reth-bench",
725 payload = index + 1,
726 "Zero gas used in payload, requesting fixed chunk of additional transactions"
727 );
728 return RetryOutcome::NeedMore(self.target_gas);
729 }
730
731 let gas_sent_needed_total =
732 (self.target_gas as u128 * gas_sent as u128).div_ceil(gas_used as u128) as u64;
733 let additional = gas_sent_needed_total.saturating_sub(gas_sent);
734 let additional = additional.min(self.target_gas * MAX_ADDITIONAL_GAS_MULTIPLIER);
735
736 if additional == 0 {
737 info!(
738 target: "reth-bench",
739 payload = index + 1,
740 gas_used,
741 target_gas = self.target_gas,
742 "No additional transactions needed based on ratio"
743 );
744 return RetryOutcome::Success;
745 }
746
747 let ratio = gas_used as f64 / gas_sent as f64;
748 info!(
749 target: "reth-bench",
750 payload = index + 1,
751 gas_used,
752 gas_sent,
753 ratio = format!("{:.4}", ratio),
754 additional_gas = additional,
755 "Underfilled, collecting more transactions for retry"
756 );
757 RetryOutcome::NeedMore(additional)
758 }
759
760 async fn build_payload_static(
762 testing_provider: &RootProvider<AnyNetwork>,
763 transactions: &[Bytes],
764 index: u64,
765 parent_hash: B256,
766 parent_timestamp: u64,
767 ) -> eyre::Result<BuiltPayload> {
768 let request = TestingBuildBlockRequestV1 {
769 parent_block_hash: parent_hash,
770 payload_attributes: PayloadAttributes {
771 timestamp: parent_timestamp + 12,
772 prev_randao: B256::ZERO,
773 suggested_fee_recipient: alloy_primitives::Address::ZERO,
774 withdrawals: Some(vec![]),
775 parent_beacon_block_root: Some(B256::ZERO),
776 },
777 transactions: transactions.to_vec(),
778 extra_data: None,
779 };
780
781 let total_tx_bytes: usize = transactions.iter().map(|tx| tx.len()).sum();
782 info!(
783 target: "reth-bench",
784 payload = index + 1,
785 tx_count = transactions.len(),
786 total_tx_bytes = total_tx_bytes,
787 parent_hash = %parent_hash,
788 "Sending to testing_buildBlockV1"
789 );
790 let envelope: ExecutionPayloadEnvelopeV5 =
791 testing_provider.client().request("testing_buildBlockV1", [request]).await?;
792
793 let v4_envelope = envelope.try_into_v4()?;
794
795 let inner = &v4_envelope.envelope_inner.execution_payload.payload_inner.payload_inner;
796 let block_hash = inner.block_hash;
797 let block_number = inner.block_number;
798 let timestamp = inner.timestamp;
799 let gas_used = inner.gas_used;
800
801 Ok(BuiltPayload { block_number, envelope: v4_envelope, block_hash, timestamp, gas_used })
802 }
803
804 fn save_payload(&self, payload: &BuiltPayload) -> eyre::Result<()> {
806 let filename = format!("payload_block_{}.json", payload.block_number);
807 let filepath = self.output_dir.join(&filename);
808 let json = serde_json::to_string_pretty(&payload.envelope)?;
809 std::fs::write(&filepath, &json)
810 .wrap_err_with(|| format!("Failed to write payload to {:?}", filepath))?;
811 info!(target: "reth-bench", block_number = payload.block_number, block_hash = %payload.block_hash, path = %filepath.display(), "Payload saved");
812 Ok(())
813 }
814
815 async fn execute_payload_v4(
816 &self,
817 provider: &RootProvider<AnyNetwork>,
818 envelope: ExecutionPayloadEnvelopeV4,
819 parent_hash: B256,
820 ) -> eyre::Result<()> {
821 let block_hash =
822 envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
823
824 let status = provider
825 .new_payload_v4(
826 envelope.envelope_inner.execution_payload,
827 vec![],
828 B256::ZERO,
829 envelope.execution_requests.to_vec(),
830 )
831 .await?;
832
833 if !status.is_valid() {
834 return Err(eyre::eyre!("Payload rejected: {:?}", status));
835 }
836
837 let fcu_state = ForkchoiceState {
838 head_block_hash: block_hash,
839 safe_block_hash: parent_hash,
840 finalized_block_hash: parent_hash,
841 };
842
843 let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
844
845 if !fcu_result.is_valid() {
846 return Err(eyre::eyre!("FCU rejected: {:?}", fcu_result));
847 }
848
849 Ok(())
850 }
851}