1use crate::{
8 errors::{EthHandshakeError, EthStreamError},
9 handshake::EthereumEthHandshake,
10 message::{EthBroadcastMessage, ProtocolBroadcastMessage},
11 p2pstream::HANDSHAKE_TIMEOUT,
12 CanDisconnect, DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, ProtocolMessage,
13 UnifiedStatus,
14};
15use alloy_primitives::bytes::{Bytes, BytesMut};
16use alloy_rlp::Encodable;
17use futures::{ready, Sink, SinkExt};
18use pin_project::pin_project;
19use reth_eth_wire_types::{NetworkPrimitives, RawCapabilityMessage};
20use reth_ethereum_forks::ForkFilter;
21use std::{
22 future::Future,
23 pin::Pin,
24 task::{Context, Poll},
25 time::Duration,
26};
27use tokio::time::timeout;
28use tokio_stream::Stream;
29use tracing::{debug, trace};
30
31pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
34
35pub(crate) const MAX_STATUS_SIZE: usize = 500 * 1024;
37
38#[pin_project]
41#[derive(Debug)]
42pub struct UnauthedEthStream<S> {
43 #[pin]
44 inner: S,
45}
46
47impl<S> UnauthedEthStream<S> {
48 pub const fn new(inner: S) -> Self {
50 Self { inner }
51 }
52
53 pub fn into_inner(self) -> S {
55 self.inner
56 }
57}
58
59impl<S, E> UnauthedEthStream<S>
60where
61 S: Stream<Item = Result<BytesMut, E>> + CanDisconnect<Bytes> + Send + Unpin,
62 EthStreamError: From<E> + From<<S as Sink<Bytes>>::Error>,
63{
64 pub async fn handshake<N: NetworkPrimitives>(
71 self,
72 status: UnifiedStatus,
73 fork_filter: ForkFilter,
74 ) -> Result<(EthStream<S, N>, UnifiedStatus), EthStreamError> {
75 self.handshake_with_timeout(status, fork_filter, HANDSHAKE_TIMEOUT).await
76 }
77
78 pub async fn handshake_with_timeout<N: NetworkPrimitives>(
80 self,
81 status: UnifiedStatus,
82 fork_filter: ForkFilter,
83 timeout_limit: Duration,
84 ) -> Result<(EthStream<S, N>, UnifiedStatus), EthStreamError> {
85 timeout(timeout_limit, Self::handshake_without_timeout(self, status, fork_filter))
86 .await
87 .map_err(|_| EthStreamError::StreamTimeout)?
88 }
89
90 pub async fn handshake_without_timeout<N: NetworkPrimitives>(
92 mut self,
93 status: UnifiedStatus,
94 fork_filter: ForkFilter,
95 ) -> Result<(EthStream<S, N>, UnifiedStatus), EthStreamError> {
96 trace!(
97 status = %status.into_message(),
98 "sending eth status to peer"
99 );
100 let their_status =
101 EthereumEthHandshake(&mut self.inner).eth_handshake(status, fork_filter).await?;
102
103 let stream = EthStream::new(status.version, self.inner);
106
107 Ok((stream, their_status))
108 }
109}
110
111#[derive(Debug)]
113pub struct EthStreamInner<N> {
114 version: EthVersion,
116 _pd: std::marker::PhantomData<N>,
117}
118
119impl<N> EthStreamInner<N>
120where
121 N: NetworkPrimitives,
122{
123 pub const fn new(version: EthVersion) -> Self {
125 Self { version, _pd: std::marker::PhantomData }
126 }
127
128 #[inline]
130 pub const fn version(&self) -> EthVersion {
131 self.version
132 }
133
134 pub fn decode_message(&self, bytes: BytesMut) -> Result<EthMessage<N>, EthStreamError> {
136 if bytes.len() > MAX_MESSAGE_SIZE {
137 return Err(EthStreamError::MessageTooBig(bytes.len()));
138 }
139
140 let msg = match ProtocolMessage::decode_message(self.version, &mut bytes.as_ref()) {
141 Ok(m) => m,
142 Err(err) => {
143 let msg = if bytes.len() > 50 {
144 format!("{:02x?}...{:x?}", &bytes[..10], &bytes[bytes.len() - 10..])
145 } else {
146 format!("{bytes:02x?}")
147 };
148 debug!(
149 version=?self.version,
150 %msg,
151 "failed to decode protocol message"
152 );
153 return Err(EthStreamError::InvalidMessage(err));
154 }
155 };
156
157 if matches!(msg.message, EthMessage::Status(_)) {
158 return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake));
159 }
160
161 Ok(msg.message)
162 }
163
164 pub fn encode_message(&self, item: EthMessage<N>) -> Result<Bytes, EthStreamError> {
168 if matches!(item, EthMessage::Status(_)) {
169 return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake));
170 }
171
172 Ok(Bytes::from(alloy_rlp::encode(ProtocolMessage::from(item))))
173 }
174}
175
176#[pin_project]
179#[derive(Debug)]
180pub struct EthStream<S, N = EthNetworkPrimitives> {
181 eth: EthStreamInner<N>,
183 #[pin]
184 inner: S,
185}
186
187impl<S, N: NetworkPrimitives> EthStream<S, N> {
188 #[inline]
191 pub const fn new(version: EthVersion, inner: S) -> Self {
192 Self { eth: EthStreamInner::new(version), inner }
193 }
194
195 #[inline]
197 pub const fn version(&self) -> EthVersion {
198 self.eth.version()
199 }
200
201 #[inline]
203 pub const fn inner(&self) -> &S {
204 &self.inner
205 }
206
207 #[inline]
209 pub const fn inner_mut(&mut self) -> &mut S {
210 &mut self.inner
211 }
212
213 #[inline]
215 pub fn into_inner(self) -> S {
216 self.inner
217 }
218}
219
220impl<S, E, N> EthStream<S, N>
221where
222 S: Sink<Bytes, Error = E> + Unpin,
223 EthStreamError: From<E>,
224 N: NetworkPrimitives,
225{
226 pub fn start_send_broadcast(
228 &mut self,
229 item: EthBroadcastMessage<N>,
230 ) -> Result<(), EthStreamError> {
231 self.inner.start_send_unpin(Bytes::from(alloy_rlp::encode(
232 ProtocolBroadcastMessage::from(item),
233 )))?;
234
235 Ok(())
236 }
237
238 pub fn start_send_raw(&mut self, msg: RawCapabilityMessage) -> Result<(), EthStreamError> {
240 let mut bytes = Vec::with_capacity(msg.payload.len() + 1);
241 msg.id.encode(&mut bytes);
242 bytes.extend_from_slice(&msg.payload);
243
244 self.inner.start_send_unpin(bytes.into())?;
245 Ok(())
246 }
247}
248
249impl<S, E, N> Stream for EthStream<S, N>
250where
251 S: Stream<Item = Result<BytesMut, E>> + Unpin,
252 EthStreamError: From<E>,
253 N: NetworkPrimitives,
254{
255 type Item = Result<EthMessage<N>, EthStreamError>;
256
257 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
258 let this = self.project();
259 let res = ready!(this.inner.poll_next(cx));
260
261 match res {
262 Some(Ok(bytes)) => Poll::Ready(Some(this.eth.decode_message(bytes))),
263 Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
264 None => Poll::Ready(None),
265 }
266 }
267}
268
269impl<S, N> Sink<EthMessage<N>> for EthStream<S, N>
270where
271 S: CanDisconnect<Bytes> + Unpin,
272 EthStreamError: From<<S as Sink<Bytes>>::Error>,
273 N: NetworkPrimitives,
274{
275 type Error = EthStreamError;
276
277 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
278 self.project().inner.poll_ready(cx).map_err(Into::into)
279 }
280
281 fn start_send(self: Pin<&mut Self>, item: EthMessage<N>) -> Result<(), Self::Error> {
282 if matches!(item, EthMessage::Status(_)) {
283 return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake))
293 }
294
295 self.project()
296 .inner
297 .start_send(Bytes::from(alloy_rlp::encode(ProtocolMessage::from(item))))?;
298
299 Ok(())
300 }
301
302 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
303 self.project().inner.poll_flush(cx).map_err(Into::into)
304 }
305
306 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
307 self.project().inner.poll_close(cx).map_err(Into::into)
308 }
309}
310
311impl<S, N> CanDisconnect<EthMessage<N>> for EthStream<S, N>
312where
313 S: CanDisconnect<Bytes> + Send,
314 EthStreamError: From<<S as Sink<Bytes>>::Error>,
315 N: NetworkPrimitives,
316{
317 fn disconnect(
318 &mut self,
319 reason: DisconnectReason,
320 ) -> Pin<Box<dyn Future<Output = Result<(), EthStreamError>> + Send + '_>> {
321 Box::pin(async move { self.inner.disconnect(reason).await.map_err(Into::into) })
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use super::UnauthedEthStream;
328 use crate::{
329 broadcast::BlockHashNumber,
330 errors::{EthHandshakeError, EthStreamError},
331 ethstream::RawCapabilityMessage,
332 hello::DEFAULT_TCP_PORT,
333 p2pstream::UnauthedP2PStream,
334 EthMessage, EthStream, EthVersion, HelloMessageWithProtocols, PassthroughCodec,
335 ProtocolVersion, Status, StatusMessage,
336 };
337 use alloy_chains::NamedChain;
338 use alloy_primitives::{bytes::Bytes, B256, U256};
339 use alloy_rlp::Decodable;
340 use futures::{SinkExt, StreamExt};
341 use reth_ecies::stream::ECIESStream;
342 use reth_eth_wire_types::{EthNetworkPrimitives, UnifiedStatus};
343 use reth_ethereum_forks::{ForkFilter, Head};
344 use reth_network_peers::pk2id;
345 use secp256k1::{SecretKey, SECP256K1};
346 use std::time::Duration;
347 use tokio::net::{TcpListener, TcpStream};
348 use tokio_util::codec::Decoder;
349
350 #[tokio::test]
351 async fn can_handshake() {
352 let genesis = B256::random();
353 let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
354
355 let status = Status {
356 version: EthVersion::Eth67,
357 chain: NamedChain::Mainnet.into(),
358 total_difficulty: U256::ZERO,
359 blockhash: B256::random(),
360 genesis,
361 forkid: fork_filter.current(),
363 };
364 let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
365
366 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
367 let local_addr = listener.local_addr().unwrap();
368
369 let status_clone = unified_status;
370 let fork_filter_clone = fork_filter.clone();
371 let handle = tokio::spawn(async move {
372 let (incoming, _) = listener.accept().await.unwrap();
374 let stream = PassthroughCodec::default().framed(incoming);
375 let (_, their_status) = UnauthedEthStream::new(stream)
376 .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
377 .await
378 .unwrap();
379
380 assert_eq!(their_status, status_clone);
382 });
383
384 let outgoing = TcpStream::connect(local_addr).await.unwrap();
385 let sink = PassthroughCodec::default().framed(outgoing);
386
387 let (_, their_status) = UnauthedEthStream::new(sink)
389 .handshake::<EthNetworkPrimitives>(unified_status, fork_filter)
390 .await
391 .unwrap();
392
393 assert_eq!(their_status, unified_status);
395
396 handle.await.unwrap();
398 }
399
400 #[tokio::test]
401 async fn pass_handshake_on_low_td_bitlen() {
402 let genesis = B256::random();
403 let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
404
405 let status = Status {
406 version: EthVersion::Eth67,
407 chain: NamedChain::Mainnet.into(),
408 total_difficulty: U256::from(2).pow(U256::from(100)) - U256::from(1),
409 blockhash: B256::random(),
410 genesis,
411 forkid: fork_filter.current(),
413 };
414 let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
415
416 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
417 let local_addr = listener.local_addr().unwrap();
418
419 let status_clone = unified_status;
420 let fork_filter_clone = fork_filter.clone();
421 let handle = tokio::spawn(async move {
422 let (incoming, _) = listener.accept().await.unwrap();
424 let stream = PassthroughCodec::default().framed(incoming);
425 let (_, their_status) = UnauthedEthStream::new(stream)
426 .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
427 .await
428 .unwrap();
429
430 assert_eq!(their_status, status_clone);
432 });
433
434 let outgoing = TcpStream::connect(local_addr).await.unwrap();
435 let sink = PassthroughCodec::default().framed(outgoing);
436
437 let (_, their_status) = UnauthedEthStream::new(sink)
439 .handshake::<EthNetworkPrimitives>(unified_status, fork_filter)
440 .await
441 .unwrap();
442
443 assert_eq!(their_status, unified_status);
445
446 handle.await.unwrap();
448 }
449
450 #[tokio::test]
451 async fn fail_handshake_on_high_td_bitlen() {
452 let genesis = B256::random();
453 let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
454
455 let status = Status {
456 version: EthVersion::Eth67,
457 chain: NamedChain::Mainnet.into(),
458 total_difficulty: U256::from(2).pow(U256::from(164)),
459 blockhash: B256::random(),
460 genesis,
461 forkid: fork_filter.current(),
463 };
464 let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
465
466 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
467 let local_addr = listener.local_addr().unwrap();
468
469 let status_clone = unified_status;
470 let fork_filter_clone = fork_filter.clone();
471 let handle = tokio::spawn(async move {
472 let (incoming, _) = listener.accept().await.unwrap();
474 let stream = PassthroughCodec::default().framed(incoming);
475 let handshake_res = UnauthedEthStream::new(stream)
476 .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
477 .await;
478
479 assert!(matches!(
481 handshake_res,
482 Err(EthStreamError::EthHandshakeError(
483 EthHandshakeError::TotalDifficultyBitLenTooLarge { got: 165, maximum: 160 }
484 ))
485 ));
486 });
487
488 let outgoing = TcpStream::connect(local_addr).await.unwrap();
489 let sink = PassthroughCodec::default().framed(outgoing);
490
491 let handshake_res = UnauthedEthStream::new(sink)
493 .handshake::<EthNetworkPrimitives>(unified_status, fork_filter)
494 .await;
495
496 assert!(matches!(
498 handshake_res,
499 Err(EthStreamError::EthHandshakeError(
500 EthHandshakeError::TotalDifficultyBitLenTooLarge { got: 165, maximum: 160 }
501 ))
502 ));
503
504 handle.await.unwrap();
506 }
507
508 #[tokio::test]
509 async fn can_write_and_read_cleartext() {
510 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
511 let local_addr = listener.local_addr().unwrap();
512 let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
513 vec![
514 BlockHashNumber { hash: B256::random(), number: 5 },
515 BlockHashNumber { hash: B256::random(), number: 6 },
516 ]
517 .into(),
518 );
519
520 let test_msg_clone = test_msg.clone();
521 let handle = tokio::spawn(async move {
522 let (incoming, _) = listener.accept().await.unwrap();
524 let stream = PassthroughCodec::default().framed(incoming);
525 let mut stream = EthStream::new(EthVersion::Eth67, stream);
526
527 let message = stream.next().await.unwrap().unwrap();
529 assert_eq!(message, test_msg_clone);
530 });
531
532 let outgoing = TcpStream::connect(local_addr).await.unwrap();
533 let sink = PassthroughCodec::default().framed(outgoing);
534 let mut client_stream = EthStream::new(EthVersion::Eth67, sink);
535
536 client_stream.send(test_msg).await.unwrap();
537
538 handle.await.unwrap();
540 }
541
542 #[tokio::test]
543 async fn can_write_and_read_ecies() {
544 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
545 let local_addr = listener.local_addr().unwrap();
546 let server_key = SecretKey::new(&mut rand_08::thread_rng());
547 let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
548 vec![
549 BlockHashNumber { hash: B256::random(), number: 5 },
550 BlockHashNumber { hash: B256::random(), number: 6 },
551 ]
552 .into(),
553 );
554
555 let test_msg_clone = test_msg.clone();
556 let handle = tokio::spawn(async move {
557 let (incoming, _) = listener.accept().await.unwrap();
559 let stream = ECIESStream::incoming(incoming, server_key).await.unwrap();
560 let mut stream = EthStream::new(EthVersion::Eth67, stream);
561
562 let message = stream.next().await.unwrap().unwrap();
564 assert_eq!(message, test_msg_clone);
565 });
566
567 let server_id = pk2id(&server_key.public_key(SECP256K1));
569
570 let client_key = SecretKey::new(&mut rand_08::thread_rng());
571
572 let outgoing = TcpStream::connect(local_addr).await.unwrap();
573 let outgoing = ECIESStream::connect(outgoing, client_key, server_id).await.unwrap();
574 let mut client_stream = EthStream::new(EthVersion::Eth67, outgoing);
575
576 client_stream.send(test_msg).await.unwrap();
577
578 handle.await.unwrap();
580 }
581
582 #[tokio::test(flavor = "multi_thread")]
583 async fn ethstream_over_p2p() {
584 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
587 let local_addr = listener.local_addr().unwrap();
588 let server_key = SecretKey::new(&mut rand_08::thread_rng());
589 let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
590 vec![
591 BlockHashNumber { hash: B256::random(), number: 5 },
592 BlockHashNumber { hash: B256::random(), number: 6 },
593 ]
594 .into(),
595 );
596
597 let genesis = B256::random();
598 let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
599
600 let status = Status {
601 version: EthVersion::Eth67,
602 chain: NamedChain::Mainnet.into(),
603 total_difficulty: U256::ZERO,
604 blockhash: B256::random(),
605 genesis,
606 forkid: fork_filter.current(),
608 };
609 let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
610
611 let status_copy = unified_status;
612 let fork_filter_clone = fork_filter.clone();
613 let test_msg_clone = test_msg.clone();
614 let handle = tokio::spawn(async move {
615 let (incoming, _) = listener.accept().await.unwrap();
617 let stream = ECIESStream::incoming(incoming, server_key).await.unwrap();
618
619 let server_hello = HelloMessageWithProtocols {
620 protocol_version: ProtocolVersion::V5,
621 client_version: "bitcoind/1.0.0".to_string(),
622 protocols: vec![EthVersion::Eth67.into()],
623 port: DEFAULT_TCP_PORT,
624 id: pk2id(&server_key.public_key(SECP256K1)),
625 };
626
627 let unauthed_stream = UnauthedP2PStream::new(stream);
628 let (p2p_stream, _) = unauthed_stream.handshake(server_hello).await.unwrap();
629 let (mut eth_stream, _) = UnauthedEthStream::new(p2p_stream)
630 .handshake(status_copy, fork_filter_clone)
631 .await
632 .unwrap();
633
634 let message = eth_stream.next().await.unwrap().unwrap();
636 assert_eq!(message, test_msg_clone);
637 });
638
639 let server_id = pk2id(&server_key.public_key(SECP256K1));
641
642 let client_key = SecretKey::new(&mut rand_08::thread_rng());
643
644 let outgoing = TcpStream::connect(local_addr).await.unwrap();
645 let sink = ECIESStream::connect(outgoing, client_key, server_id).await.unwrap();
646
647 let client_hello = HelloMessageWithProtocols {
648 protocol_version: ProtocolVersion::V5,
649 client_version: "bitcoind/1.0.0".to_string(),
650 protocols: vec![EthVersion::Eth67.into()],
651 port: DEFAULT_TCP_PORT,
652 id: pk2id(&client_key.public_key(SECP256K1)),
653 };
654
655 let unauthed_stream = UnauthedP2PStream::new(sink);
656 let (p2p_stream, _) = unauthed_stream.handshake(client_hello).await.unwrap();
657
658 let (mut client_stream, _) = UnauthedEthStream::new(p2p_stream)
659 .handshake(unified_status, fork_filter)
660 .await
661 .unwrap();
662
663 client_stream.send(test_msg).await.unwrap();
664
665 handle.await.unwrap();
667 }
668
669 #[tokio::test]
670 async fn handshake_should_timeout() {
671 let genesis = B256::random();
672 let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
673
674 let status = Status {
675 version: EthVersion::Eth67,
676 chain: NamedChain::Mainnet.into(),
677 total_difficulty: U256::ZERO,
678 blockhash: B256::random(),
679 genesis,
680 forkid: fork_filter.current(),
682 };
683 let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
684
685 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
686 let local_addr = listener.local_addr().unwrap();
687
688 let status_clone = unified_status;
689 let fork_filter_clone = fork_filter.clone();
690 let _handle = tokio::spawn(async move {
691 tokio::time::sleep(Duration::from_secs(11)).await;
693 let (incoming, _) = listener.accept().await.unwrap();
695 let stream = PassthroughCodec::default().framed(incoming);
696 let (_, their_status) = UnauthedEthStream::new(stream)
697 .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
698 .await
699 .unwrap();
700
701 assert_eq!(their_status, status_clone);
703 });
704
705 let outgoing = TcpStream::connect(local_addr).await.unwrap();
706 let sink = PassthroughCodec::default().framed(outgoing);
707
708 let handshake_result = UnauthedEthStream::new(sink)
710 .handshake_with_timeout::<EthNetworkPrimitives>(
711 unified_status,
712 fork_filter,
713 Duration::from_secs(1),
714 )
715 .await;
716
717 assert!(
719 matches!(handshake_result, Err(e) if e.to_string() == EthStreamError::StreamTimeout.to_string())
720 );
721 }
722
723 #[tokio::test]
724 async fn can_write_and_read_raw_capability() {
725 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
726 let local_addr = listener.local_addr().unwrap();
727
728 let test_msg = RawCapabilityMessage { id: 0x1234, payload: Bytes::from(vec![1, 2, 3, 4]) };
729
730 let test_msg_clone = test_msg.clone();
731 let handle = tokio::spawn(async move {
732 let (incoming, _) = listener.accept().await.unwrap();
733 let stream = PassthroughCodec::default().framed(incoming);
734 let mut stream = EthStream::<_, EthNetworkPrimitives>::new(EthVersion::Eth67, stream);
735
736 let bytes = stream.inner_mut().next().await.unwrap().unwrap();
737
738 let mut id_bytes = &bytes[..];
740 let decoded_id = <usize as Decodable>::decode(&mut id_bytes).unwrap();
741 assert_eq!(decoded_id, test_msg_clone.id);
742
743 let remaining = id_bytes;
745 assert_eq!(remaining, &test_msg_clone.payload[..]);
746 });
747
748 let outgoing = TcpStream::connect(local_addr).await.unwrap();
749 let sink = PassthroughCodec::default().framed(outgoing);
750 let mut client_stream = EthStream::<_, EthNetworkPrimitives>::new(EthVersion::Eth67, sink);
751
752 client_stream.start_send_raw(test_msg).unwrap();
753 client_stream.inner_mut().flush().await.unwrap();
754
755 handle.await.unwrap();
756 }
757}