1use alloy_eips::BlockId;
4use alloy_primitives::{map::HashSet, Bytes, TxHash, B256};
5use alloy_rpc_types_eth::{transaction::TransactionRequest, Index};
6use alloy_rpc_types_trace::{
7 filter::TraceFilter,
8 opcode::BlockOpcodeGas,
9 parity::{LocalizedTransactionTrace, TraceResults, TraceType},
10 tracerequest::TraceCallRequest,
11};
12use futures::{Stream, StreamExt};
13use jsonrpsee::core::client::Error as RpcError;
14use reth_rpc_api::clients::TraceApiClient;
15use std::{
16 pin::Pin,
17 task::{Context, Poll},
18};
19
20type RawTransactionTraceResult<'a> =
22 Pin<Box<dyn Stream<Item = Result<(TraceResults, Bytes), (RpcError, Bytes)>> + 'a>>;
23
24pub type TraceBlockResult = Result<(Vec<LocalizedTransactionTrace>, BlockId), (RpcError, BlockId)>;
26
27pub type TraceBlockOpCodeGasResult = Result<(BlockOpcodeGas, BlockId), (RpcError, BlockId)>;
29
30pub type ReplayTransactionResult = Result<(TraceResults, TxHash), (RpcError, TxHash)>;
32
33pub type CallManyTraceResult = Result<
35 (Vec<TraceResults>, Vec<(TransactionRequest, HashSet<TraceType>)>),
36 (RpcError, Vec<(TransactionRequest, HashSet<TraceType>)>),
37>;
38
39pub type TraceGetResult =
42 Result<(Option<LocalizedTransactionTrace>, B256, Vec<Index>), (RpcError, B256, Vec<Index>)>;
43
44pub type TraceFilterResult =
46 Result<(Vec<LocalizedTransactionTrace>, TraceFilter), (RpcError, TraceFilter)>;
47
48pub type TraceCallResult = Result<TraceResults, (RpcError, TraceCallRequest)>;
50
51pub trait TraceApiExt {
53 type Provider;
55
56 fn trace_block_buffered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
60 where
61 I: IntoIterator<Item = B>,
62 B: Into<BlockId>;
63
64 fn trace_block_buffered_unordered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
68 where
69 I: IntoIterator<Item = B>,
70 B: Into<BlockId>;
71
72 fn trace_block_opcode_gas_unordered<I, B>(
76 &self,
77 params: I,
78 n: usize,
79 ) -> TraceBlockOpcodeGasStream<'_>
80 where
81 I: IntoIterator<Item = B>,
82 B: Into<BlockId>;
83
84 fn replay_transactions<I>(
88 &self,
89 tx_hashes: I,
90 trace_types: HashSet<TraceType>,
91 ) -> ReplayTransactionStream<'_>
92 where
93 I: IntoIterator<Item = TxHash>;
94
95 fn trace_raw_transaction_stream(
97 &self,
98 data: Bytes,
99 trace_types: HashSet<TraceType>,
100 block_id: Option<BlockId>,
101 ) -> RawTransactionTraceStream<'_>;
102
103 fn trace_call_many_stream<I>(
106 &self,
107 calls: I,
108 block_id: Option<BlockId>,
109 ) -> CallManyTraceStream<'_>
110 where
111 I: IntoIterator<Item = (TransactionRequest, HashSet<TraceType>)>;
112
113 fn trace_get_stream<I>(&self, hash: B256, indices: I) -> TraceGetStream<'_>
115 where
116 I: IntoIterator<Item = Index>;
117
118 fn trace_filter_stream<I>(&self, filters: I) -> TraceFilterStream<'_>
120 where
121 I: IntoIterator<Item = TraceFilter>;
122
123 fn trace_call_stream(&self, request: TraceCallRequest) -> TraceCallStream<'_>;
125}
126#[must_use = "streams do nothing unless polled"]
128pub struct TraceCallStream<'a> {
129 stream: Pin<Box<dyn Stream<Item = TraceCallResult> + 'a>>,
130}
131
132impl Stream for TraceCallStream<'_> {
133 type Item = TraceCallResult;
134
135 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136 self.stream.as_mut().poll_next(cx)
137 }
138}
139
140impl std::fmt::Debug for TraceCallStream<'_> {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 f.debug_struct("TraceCallStream").finish()
143 }
144}
145
146#[must_use = "streams do nothing unless polled"]
148pub struct TraceFilterStream<'a> {
149 stream: Pin<Box<dyn Stream<Item = TraceFilterResult> + 'a>>,
150}
151
152impl Stream for TraceFilterStream<'_> {
153 type Item = TraceFilterResult;
154
155 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157 self.stream.as_mut().poll_next(cx)
158 }
159}
160
161impl std::fmt::Debug for TraceFilterStream<'_> {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 f.debug_struct("TraceFilterStream").finish_non_exhaustive()
165 }
166}
167
168#[must_use = "streams do nothing unless polled"]
171pub struct TraceGetStream<'a> {
172 stream: Pin<Box<dyn Stream<Item = TraceGetResult> + 'a>>,
173}
174
175impl Stream for TraceGetStream<'_> {
176 type Item = TraceGetResult;
177
178 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
180 self.stream.as_mut().poll_next(cx)
181 }
182}
183
184impl std::fmt::Debug for TraceGetStream<'_> {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 f.debug_struct("TraceGetStream").finish_non_exhaustive()
187 }
188}
189
190#[must_use = "streams do nothing unless polled"]
194pub struct CallManyTraceStream<'a> {
195 stream: Pin<Box<dyn Stream<Item = CallManyTraceResult> + 'a>>,
196}
197
198impl Stream for CallManyTraceStream<'_> {
199 type Item = CallManyTraceResult;
200
201 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
203 self.stream.as_mut().poll_next(cx)
204 }
205}
206
207impl std::fmt::Debug for CallManyTraceStream<'_> {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 f.debug_struct("CallManyTraceStream").finish()
210 }
211}
212
213#[must_use = "streams do nothing unless polled"]
215pub struct RawTransactionTraceStream<'a> {
216 stream: RawTransactionTraceResult<'a>,
217}
218
219impl Stream for RawTransactionTraceStream<'_> {
220 type Item = Result<(TraceResults, Bytes), (RpcError, Bytes)>;
221
222 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
223 self.stream.as_mut().poll_next(cx)
224 }
225}
226
227impl std::fmt::Debug for RawTransactionTraceStream<'_> {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 f.debug_struct("RawTransactionTraceStream").finish()
230 }
231}
232
233#[must_use = "streams do nothing unless polled"]
235pub struct ReplayTransactionStream<'a> {
236 stream: Pin<Box<dyn Stream<Item = ReplayTransactionResult> + 'a>>,
237}
238
239impl Stream for ReplayTransactionStream<'_> {
240 type Item = ReplayTransactionResult;
241
242 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
243 self.stream.as_mut().poll_next(cx)
244 }
245}
246
247impl std::fmt::Debug for ReplayTransactionStream<'_> {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 f.debug_struct("ReplayTransactionStream").finish()
250 }
251}
252
253impl<T: TraceApiClient + Sync> TraceApiExt for T {
254 type Provider = T;
255
256 fn trace_block_buffered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
257 where
258 I: IntoIterator<Item = B>,
259 B: Into<BlockId>,
260 {
261 let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
262 let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
263 match self.trace_block(block).await {
264 Ok(result) => Ok((result.unwrap_or_default(), block)),
265 Err(err) => Err((err, block)),
266 }
267 }))
268 .buffered(n);
269 TraceBlockStream { stream: Box::pin(stream) }
270 }
271
272 fn trace_block_buffered_unordered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
273 where
274 I: IntoIterator<Item = B>,
275 B: Into<BlockId>,
276 {
277 let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
278 let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
279 match self.trace_block(block).await {
280 Ok(result) => Ok((result.unwrap_or_default(), block)),
281 Err(err) => Err((err, block)),
282 }
283 }))
284 .buffer_unordered(n);
285 TraceBlockStream { stream: Box::pin(stream) }
286 }
287
288 fn trace_block_opcode_gas_unordered<I, B>(
289 &self,
290 params: I,
291 n: usize,
292 ) -> TraceBlockOpcodeGasStream<'_>
293 where
294 I: IntoIterator<Item = B>,
295 B: Into<BlockId>,
296 {
297 let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
298 let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
299 match self.trace_block_opcode_gas(block).await {
300 Ok(result) => Ok((result.unwrap(), block)),
301 Err(err) => Err((err, block)),
302 }
303 }))
304 .buffered(n);
305 TraceBlockOpcodeGasStream { stream: Box::pin(stream) }
306 }
307
308 fn replay_transactions<I>(
309 &self,
310 tx_hashes: I,
311 trace_types: HashSet<TraceType>,
312 ) -> ReplayTransactionStream<'_>
313 where
314 I: IntoIterator<Item = TxHash>,
315 {
316 let hashes = tx_hashes.into_iter().collect::<Vec<_>>();
317 let stream = futures::stream::iter(hashes.into_iter().map(move |hash| {
318 let trace_types_clone = trace_types.clone(); async move {
320 match self.replay_transaction(hash, trace_types_clone).await {
321 Ok(result) => Ok((result, hash)),
322 Err(err) => Err((err, hash)),
323 }
324 }
325 }))
326 .buffered(10);
327 ReplayTransactionStream { stream: Box::pin(stream) }
328 }
329
330 fn trace_raw_transaction_stream(
331 &self,
332 data: Bytes,
333 trace_types: HashSet<TraceType>,
334 block_id: Option<BlockId>,
335 ) -> RawTransactionTraceStream<'_> {
336 let stream = futures::stream::once(async move {
337 match self.trace_raw_transaction(data.clone(), trace_types, block_id).await {
338 Ok(result) => Ok((result, data)),
339 Err(err) => Err((err, data)),
340 }
341 });
342 RawTransactionTraceStream { stream: Box::pin(stream) }
343 }
344
345 fn trace_call_many_stream<I>(
346 &self,
347 calls: I,
348 block_id: Option<BlockId>,
349 ) -> CallManyTraceStream<'_>
350 where
351 I: IntoIterator<Item = (TransactionRequest, HashSet<TraceType>)>,
352 {
353 let call_set = calls.into_iter().collect::<Vec<_>>();
354 let stream = futures::stream::once(async move {
355 match self.trace_call_many(call_set.clone(), block_id).await {
356 Ok(results) => Ok((results, call_set)),
357 Err(err) => Err((err, call_set)),
358 }
359 });
360 CallManyTraceStream { stream: Box::pin(stream) }
361 }
362
363 fn trace_get_stream<I>(&self, hash: B256, indices: I) -> TraceGetStream<'_>
364 where
365 I: IntoIterator<Item = Index>,
366 {
367 let index_list = indices.into_iter().collect::<Vec<_>>();
368 let stream = futures::stream::iter(index_list.into_iter().map(move |index| async move {
369 match self.trace_get(hash, vec![index]).await {
370 Ok(result) => Ok((result, hash, vec![index])),
371 Err(err) => Err((err, hash, vec![index])),
372 }
373 }))
374 .buffered(10);
375 TraceGetStream { stream: Box::pin(stream) }
376 }
377
378 fn trace_filter_stream<I>(&self, filters: I) -> TraceFilterStream<'_>
379 where
380 I: IntoIterator<Item = TraceFilter>,
381 {
382 let filter_list = filters.into_iter().collect::<Vec<_>>();
383 let stream = futures::stream::iter(filter_list.into_iter().map(move |filter| async move {
384 match self.trace_filter(filter.clone()).await {
385 Ok(result) => Ok((result, filter)),
386 Err(err) => Err((err, filter)),
387 }
388 }))
389 .buffered(10);
390 TraceFilterStream { stream: Box::pin(stream) }
391 }
392
393 fn trace_call_stream(&self, request: TraceCallRequest) -> TraceCallStream<'_> {
394 let stream = futures::stream::once(async move {
395 match self
396 .trace_call(
397 request.call.clone(),
398 request.trace_types.clone(),
399 request.block_id,
400 request.state_overrides.clone(),
401 request.block_overrides.clone(),
402 )
403 .await
404 {
405 Ok(result) => Ok(result),
406 Err(err) => Err((err, request)),
407 }
408 });
409 TraceCallStream { stream: Box::pin(stream) }
410 }
411}
412
413#[must_use = "streams do nothing unless polled"]
415pub struct TraceBlockStream<'a> {
416 stream: Pin<Box<dyn Stream<Item = TraceBlockResult> + 'a>>,
417}
418
419impl TraceBlockStream<'_> {
420 pub async fn next_err(&mut self) -> Option<(RpcError, BlockId)> {
422 loop {
423 match self.next().await? {
424 Ok(_) => {}
425 Err(err) => return Some(err),
426 }
427 }
428 }
429}
430
431impl Stream for TraceBlockStream<'_> {
432 type Item = TraceBlockResult;
433
434 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
435 self.stream.as_mut().poll_next(cx)
436 }
437}
438
439impl std::fmt::Debug for TraceBlockStream<'_> {
440 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
441 f.debug_struct("TraceBlockStream").finish_non_exhaustive()
442 }
443}
444
445#[must_use = "streams do nothing unless polled"]
447pub struct TraceBlockOpcodeGasStream<'a> {
448 stream: Pin<Box<dyn Stream<Item = TraceBlockOpCodeGasResult> + 'a>>,
449}
450
451impl TraceBlockOpcodeGasStream<'_> {
452 pub async fn next_err(&mut self) -> Option<(RpcError, BlockId)> {
454 loop {
455 match self.next().await? {
456 Ok(_) => {}
457 Err(err) => return Some(err),
458 }
459 }
460 }
461}
462
463impl Stream for TraceBlockOpcodeGasStream<'_> {
464 type Item = TraceBlockOpCodeGasResult;
465
466 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
467 self.stream.as_mut().poll_next(cx)
468 }
469}
470
471impl std::fmt::Debug for TraceBlockOpcodeGasStream<'_> {
472 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
473 f.debug_struct("TraceBlockOpcodeGasStream").finish_non_exhaustive()
474 }
475}
476
477#[derive(Debug)]
485pub struct RpcComparer<C1, C2>
486where
487 C1: TraceApiExt,
488 C2: TraceApiExt,
489{
490 client1: C1,
491 client2: C2,
492}
493impl<C1, C2> RpcComparer<C1, C2>
494where
495 C1: TraceApiExt,
496 C2: TraceApiExt,
497{
498 pub const fn new(client1: C1, client2: C2) -> Self {
508 Self { client1, client2 }
509 }
510
511 pub async fn compare_trace_block_responses(&self, block_ids: Vec<BlockId>) {
517 let stream1 = self.client1.trace_block_buffered(block_ids.clone(), 2);
518 let stream2 = self.client2.trace_block_buffered(block_ids, 2);
519
520 let mut zipped_streams = stream1.zip(stream2);
521
522 while let Some((result1, result2)) = zipped_streams.next().await {
523 match (result1, result2) {
524 (Ok((ref traces1_data, ref block1)), Ok((ref traces2_data, ref block2))) => {
525 similar_asserts::assert_eq!(
526 traces1_data,
527 traces2_data,
528 "Mismatch in traces for block: {:?}",
529 block1
530 );
531 assert_eq!(block1, block2, "Mismatch in block ids.");
532 }
533 (Err((ref err1, ref block1)), Err((ref err2, ref block2))) => {
534 assert_eq!(
535 format!("{err1:?}"),
536 format!("{err2:?}"),
537 "Different errors for block: {block1:?}"
538 );
539 assert_eq!(block1, block2, "Mismatch in block ids.");
540 }
541 _ => panic!("One client returned Ok while the other returned Err."),
542 }
543 }
544 }
545
546 pub async fn compare_replay_transaction_responses(
548 &self,
549 transaction_hashes: Vec<TxHash>,
550 trace_types: HashSet<TraceType>,
551 ) {
552 let stream1 =
553 self.client1.replay_transactions(transaction_hashes.clone(), trace_types.clone());
554 let stream2 = self.client2.replay_transactions(transaction_hashes, trace_types);
555
556 let mut zipped_streams = stream1.zip(stream2);
557
558 while let Some((result1, result2)) = zipped_streams.next().await {
559 match (result1, result2) {
560 (Ok((ref trace1_data, ref tx_hash1)), Ok((ref trace2_data, ref tx_hash2))) => {
561 similar_asserts::assert_eq!(
562 trace1_data,
563 trace2_data,
564 "Mismatch in trace results for transaction: {tx_hash1:?}",
565 );
566 assert_eq!(tx_hash1, tx_hash2, "Mismatch in transaction hashes.");
567 }
568 (Err((ref err1, ref tx_hash1)), Err((ref err2, ref tx_hash2))) => {
569 assert_eq!(
570 format!("{err1:?}"),
571 format!("{err2:?}"),
572 "Different errors for transaction: {tx_hash1:?}",
573 );
574 assert_eq!(tx_hash1, tx_hash2, "Mismatch in transaction hashes.");
575 }
576 _ => panic!("One client returned Ok while the other returned Err."),
577 }
578 }
579 }
580}
581#[cfg(test)]
582mod tests {
583 use super::*;
584 use alloy_eips::BlockNumberOrTag;
585 use alloy_rpc_types_trace::filter::TraceFilterMode;
586 use jsonrpsee::http_client::HttpClientBuilder;
587
588 const fn assert_is_stream<St: Stream>(_: &St) {}
589
590 #[tokio::test]
591 async fn can_create_block_stream() {
592 let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
593 let block = vec![BlockId::Number(5u64.into()), BlockNumberOrTag::Latest.into()];
594 let stream = client.trace_block_buffered(block, 2);
595 assert_is_stream(&stream);
596 }
597
598 #[tokio::test]
599 #[ignore]
600 async fn can_create_replay_transaction_stream() {
601 let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
602
603 let transactions = vec![
605 "0x4e08fe36db723a338e852f89f613e606b0c9a17e649b18b01251f86236a2cef3".parse().unwrap(),
606 "0xea2817f1aeeb587b82f4ab87a6dbd3560fc35ed28de1be280cb40b2a24ab48bb".parse().unwrap(),
607 ];
608
609 let trace_types = HashSet::from_iter([TraceType::StateDiff, TraceType::VmTrace]);
610
611 let mut stream = client.replay_transactions(transactions, trace_types);
612 let mut successes = 0;
613 let mut failures = 0;
614
615 assert_is_stream(&stream);
616
617 while let Some(result) = stream.next().await {
618 match result {
619 Ok((trace_result, tx_hash)) => {
620 println!("Success for tx_hash {tx_hash:?}: {trace_result:?}");
621 successes += 1;
622 }
623 Err((error, tx_hash)) => {
624 println!("Error for tx_hash {tx_hash:?}: {error:?}");
625 failures += 1;
626 }
627 }
628 }
629
630 println!("Total successes: {successes}");
631 println!("Total failures: {failures}");
632 }
633
634 #[tokio::test]
635 #[ignore]
636 async fn can_create_trace_call_many_stream() {
637 let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
638
639 let call_request_1 = TransactionRequest::default();
640 let call_request_2 = TransactionRequest::default();
641 let trace_types = HashSet::from_iter([TraceType::StateDiff, TraceType::VmTrace]);
642 let calls = vec![(call_request_1, trace_types.clone()), (call_request_2, trace_types)];
643
644 let mut stream = client.trace_call_many_stream(calls, None);
645
646 assert_is_stream(&stream);
647
648 while let Some(result) = stream.next().await {
649 match result {
650 Ok(trace_result) => {
651 println!("Success: {trace_result:?}");
652 }
653 Err(error) => {
654 println!("Error: {error:?}");
655 }
656 }
657 }
658 }
659 #[tokio::test]
660 #[ignore]
661 async fn can_create_trace_get_stream() {
662 let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
663
664 let tx_hash: B256 = "".parse().unwrap();
665
666 let indices: Vec<Index> = vec![Index::from(0)];
667
668 let mut stream = client.trace_get_stream(tx_hash, indices);
669
670 while let Some(result) = stream.next().await {
671 match result {
672 Ok(trace) => {
673 println!("Received trace: {trace:?}");
674 }
675 Err(e) => {
676 println!("Error fetching trace: {e:?}");
677 }
678 }
679 }
680 }
681
682 #[tokio::test]
683 #[ignore]
684 async fn can_create_trace_filter() {
685 let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
686
687 let filter = TraceFilter {
688 from_block: None,
689 to_block: None,
690 from_address: Vec::new(),
691 to_address: Vec::new(),
692 mode: TraceFilterMode::Union,
693 after: None,
694 count: None,
695 };
696
697 let filters = vec![filter];
698 let mut stream = client.trace_filter_stream(filters);
699
700 while let Some(result) = stream.next().await {
701 match result {
702 Ok(trace) => {
703 println!("Received trace: {trace:?}");
704 }
705 Err(e) => {
706 println!("Error fetching trace: {e:?}");
707 }
708 }
709 }
710 }
711
712 #[tokio::test]
713 #[ignore]
714 async fn can_create_trace_call_stream() {
715 let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
716
717 let trace_call_request = TraceCallRequest::default();
718
719 let mut stream = client.trace_call_stream(trace_call_request);
720 let mut successes = 0;
721 let mut failures = 0;
722
723 assert_is_stream(&stream);
724
725 while let Some(result) = stream.next().await {
726 match result {
727 Ok(trace_result) => {
728 println!("Success: {trace_result:?}");
729 successes += 1;
730 }
731 Err((error, request)) => {
732 println!("Error for request {request:?}: {error:?}");
733 failures += 1;
734 }
735 }
736 }
737
738 println!("Total successes: {successes}");
739 println!("Total failures: {failures}");
740 }
741
742 #[tokio::test]
743 #[ignore]
744 async fn block_opcode_gas_stream() {
745 let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
746 let block = vec![BlockNumberOrTag::Latest];
747 let mut stream = client.trace_block_opcode_gas_unordered(block, 2);
748 assert_is_stream(&stream);
749 let _opcodes = stream.next().await.unwrap();
750 }
751}