reth_eth_wire/
ethstream.rs

1//! Ethereum protocol stream implementations.
2//!
3//! Provides stream types for the Ethereum wire protocol.
4//! It separates protocol logic [`EthStreamInner`] from transport concerns [`EthStream`].
5//! Handles handshaking, message processing, and RLP serialization.
6
7use crate::{
8    errors::{EthHandshakeError, EthStreamError},
9    handshake::EthereumEthHandshake,
10    message::{EthBroadcastMessage, ProtocolBroadcastMessage},
11    p2pstream::HANDSHAKE_TIMEOUT,
12    CanDisconnect, DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, ProtocolMessage,
13    UnifiedStatus,
14};
15use alloy_primitives::bytes::{Bytes, BytesMut};
16use alloy_rlp::Encodable;
17use futures::{ready, Sink, SinkExt};
18use pin_project::pin_project;
19use reth_eth_wire_types::{NetworkPrimitives, RawCapabilityMessage};
20use reth_ethereum_forks::ForkFilter;
21use std::{
22    future::Future,
23    pin::Pin,
24    task::{Context, Poll},
25    time::Duration,
26};
27use tokio::time::timeout;
28use tokio_stream::Stream;
29use tracing::{debug, trace};
30
31/// [`MAX_MESSAGE_SIZE`] is the maximum cap on the size of a protocol message.
32// https://github.com/ethereum/go-ethereum/blob/30602163d5d8321fbc68afdcbbaf2362b2641bde/eth/protocols/eth/protocol.go#L50
33pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
34
35/// [`MAX_STATUS_SIZE`] is the maximum cap on the size of the initial status message
36pub(crate) const MAX_STATUS_SIZE: usize = 500 * 1024;
37
38/// An un-authenticated [`EthStream`]. This is consumed and returns a [`EthStream`] after the
39/// `Status` handshake is completed.
40#[pin_project]
41#[derive(Debug)]
42pub struct UnauthedEthStream<S> {
43    #[pin]
44    inner: S,
45}
46
47impl<S> UnauthedEthStream<S> {
48    /// Create a new `UnauthedEthStream` from a type `S` which implements `Stream` and `Sink`.
49    pub const fn new(inner: S) -> Self {
50        Self { inner }
51    }
52
53    /// Consumes the type and returns the wrapped stream
54    pub fn into_inner(self) -> S {
55        self.inner
56    }
57}
58
59impl<S, E> UnauthedEthStream<S>
60where
61    S: Stream<Item = Result<BytesMut, E>> + CanDisconnect<Bytes> + Send + Unpin,
62    EthStreamError: From<E> + From<<S as Sink<Bytes>>::Error>,
63{
64    /// Consumes the [`UnauthedEthStream`] and returns an [`EthStream`] after the `Status`
65    /// handshake is completed successfully. This also returns the `Status` message sent by the
66    /// remote peer.
67    ///
68    /// Caution: This expects that the [`UnifiedStatus`] has the proper eth version configured, with
69    /// ETH69 the initial status message changed.
70    pub async fn handshake<N: NetworkPrimitives>(
71        self,
72        status: UnifiedStatus,
73        fork_filter: ForkFilter,
74    ) -> Result<(EthStream<S, N>, UnifiedStatus), EthStreamError> {
75        self.handshake_with_timeout(status, fork_filter, HANDSHAKE_TIMEOUT).await
76    }
77
78    /// Wrapper around handshake which enforces a timeout.
79    pub async fn handshake_with_timeout<N: NetworkPrimitives>(
80        self,
81        status: UnifiedStatus,
82        fork_filter: ForkFilter,
83        timeout_limit: Duration,
84    ) -> Result<(EthStream<S, N>, UnifiedStatus), EthStreamError> {
85        timeout(timeout_limit, Self::handshake_without_timeout(self, status, fork_filter))
86            .await
87            .map_err(|_| EthStreamError::StreamTimeout)?
88    }
89
90    /// Handshake with no timeout
91    pub async fn handshake_without_timeout<N: NetworkPrimitives>(
92        mut self,
93        status: UnifiedStatus,
94        fork_filter: ForkFilter,
95    ) -> Result<(EthStream<S, N>, UnifiedStatus), EthStreamError> {
96        trace!(
97            status = %status.into_message(),
98            "sending eth status to peer"
99        );
100        let their_status =
101            EthereumEthHandshake(&mut self.inner).eth_handshake(status, fork_filter).await?;
102
103        // now we can create the `EthStream` because the peer has successfully completed
104        // the handshake
105        let stream = EthStream::new(status.version, self.inner);
106
107        Ok((stream, their_status))
108    }
109}
110
111/// Contains eth protocol specific logic for processing messages
112#[derive(Debug)]
113pub struct EthStreamInner<N> {
114    /// Negotiated eth version
115    version: EthVersion,
116    _pd: std::marker::PhantomData<N>,
117}
118
119impl<N> EthStreamInner<N>
120where
121    N: NetworkPrimitives,
122{
123    /// Creates a new [`EthStreamInner`] with the given eth version
124    pub const fn new(version: EthVersion) -> Self {
125        Self { version, _pd: std::marker::PhantomData }
126    }
127
128    /// Returns the eth version
129    #[inline]
130    pub const fn version(&self) -> EthVersion {
131        self.version
132    }
133
134    /// Decodes incoming bytes into an [`EthMessage`].
135    pub fn decode_message(&self, bytes: BytesMut) -> Result<EthMessage<N>, EthStreamError> {
136        if bytes.len() > MAX_MESSAGE_SIZE {
137            return Err(EthStreamError::MessageTooBig(bytes.len()));
138        }
139
140        let msg = match ProtocolMessage::decode_message(self.version, &mut bytes.as_ref()) {
141            Ok(m) => m,
142            Err(err) => {
143                let msg = if bytes.len() > 50 {
144                    format!("{:02x?}...{:x?}", &bytes[..10], &bytes[bytes.len() - 10..])
145                } else {
146                    format!("{bytes:02x?}")
147                };
148                debug!(
149                    version=?self.version,
150                    %msg,
151                    "failed to decode protocol message"
152                );
153                return Err(EthStreamError::InvalidMessage(err));
154            }
155        };
156
157        if matches!(msg.message, EthMessage::Status(_)) {
158            return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake));
159        }
160
161        Ok(msg.message)
162    }
163
164    /// Encodes an [`EthMessage`] to bytes.
165    ///
166    /// Validates that Status messages are not sent after handshake, enforcing protocol rules.
167    pub fn encode_message(&self, item: EthMessage<N>) -> Result<Bytes, EthStreamError> {
168        if matches!(item, EthMessage::Status(_)) {
169            return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake));
170        }
171
172        Ok(Bytes::from(alloy_rlp::encode(ProtocolMessage::from(item))))
173    }
174}
175
176/// An `EthStream` wraps over any `Stream` that yields bytes and makes it
177/// compatible with eth-networking protocol messages, which get RLP encoded/decoded.
178#[pin_project]
179#[derive(Debug)]
180pub struct EthStream<S, N = EthNetworkPrimitives> {
181    /// Eth-specific logic
182    eth: EthStreamInner<N>,
183    #[pin]
184    inner: S,
185}
186
187impl<S, N: NetworkPrimitives> EthStream<S, N> {
188    /// Creates a new unauthed [`EthStream`] from a provided stream. You will need
189    /// to manually handshake a peer.
190    #[inline]
191    pub const fn new(version: EthVersion, inner: S) -> Self {
192        Self { eth: EthStreamInner::new(version), inner }
193    }
194
195    /// Returns the eth version.
196    #[inline]
197    pub const fn version(&self) -> EthVersion {
198        self.eth.version()
199    }
200
201    /// Returns the underlying stream.
202    #[inline]
203    pub const fn inner(&self) -> &S {
204        &self.inner
205    }
206
207    /// Returns mutable access to the underlying stream.
208    #[inline]
209    pub const fn inner_mut(&mut self) -> &mut S {
210        &mut self.inner
211    }
212
213    /// Consumes this type and returns the wrapped stream.
214    #[inline]
215    pub fn into_inner(self) -> S {
216        self.inner
217    }
218}
219
220impl<S, E, N> EthStream<S, N>
221where
222    S: Sink<Bytes, Error = E> + Unpin,
223    EthStreamError: From<E>,
224    N: NetworkPrimitives,
225{
226    /// Same as [`Sink::start_send`] but accepts a [`EthBroadcastMessage`] instead.
227    pub fn start_send_broadcast(
228        &mut self,
229        item: EthBroadcastMessage<N>,
230    ) -> Result<(), EthStreamError> {
231        self.inner.start_send_unpin(Bytes::from(alloy_rlp::encode(
232            ProtocolBroadcastMessage::from(item),
233        )))?;
234
235        Ok(())
236    }
237
238    /// Sends a raw capability message directly over the stream
239    pub fn start_send_raw(&mut self, msg: RawCapabilityMessage) -> Result<(), EthStreamError> {
240        let mut bytes = Vec::with_capacity(msg.payload.len() + 1);
241        msg.id.encode(&mut bytes);
242        bytes.extend_from_slice(&msg.payload);
243
244        self.inner.start_send_unpin(bytes.into())?;
245        Ok(())
246    }
247}
248
249impl<S, E, N> Stream for EthStream<S, N>
250where
251    S: Stream<Item = Result<BytesMut, E>> + Unpin,
252    EthStreamError: From<E>,
253    N: NetworkPrimitives,
254{
255    type Item = Result<EthMessage<N>, EthStreamError>;
256
257    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
258        let this = self.project();
259        let res = ready!(this.inner.poll_next(cx));
260
261        match res {
262            Some(Ok(bytes)) => Poll::Ready(Some(this.eth.decode_message(bytes))),
263            Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
264            None => Poll::Ready(None),
265        }
266    }
267}
268
269impl<S, N> Sink<EthMessage<N>> for EthStream<S, N>
270where
271    S: CanDisconnect<Bytes> + Unpin,
272    EthStreamError: From<<S as Sink<Bytes>>::Error>,
273    N: NetworkPrimitives,
274{
275    type Error = EthStreamError;
276
277    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
278        self.project().inner.poll_ready(cx).map_err(Into::into)
279    }
280
281    fn start_send(self: Pin<&mut Self>, item: EthMessage<N>) -> Result<(), Self::Error> {
282        if matches!(item, EthMessage::Status(_)) {
283            // TODO: to disconnect here we would need to do something similar to P2PStream's
284            // start_disconnect, which would ideally be a part of the CanDisconnect trait, or at
285            // least similar.
286            //
287            // Other parts of reth do not yet need traits like CanDisconnect because atm they work
288            // exclusively with EthStream<P2PStream<S>>, where the inner P2PStream is accessible,
289            // allowing for its start_disconnect method to be called.
290            //
291            // self.project().inner.start_disconnect(DisconnectReason::ProtocolBreach);
292            return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake))
293        }
294
295        self.project()
296            .inner
297            .start_send(Bytes::from(alloy_rlp::encode(ProtocolMessage::from(item))))?;
298
299        Ok(())
300    }
301
302    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
303        self.project().inner.poll_flush(cx).map_err(Into::into)
304    }
305
306    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
307        self.project().inner.poll_close(cx).map_err(Into::into)
308    }
309}
310
311impl<S, N> CanDisconnect<EthMessage<N>> for EthStream<S, N>
312where
313    S: CanDisconnect<Bytes> + Send,
314    EthStreamError: From<<S as Sink<Bytes>>::Error>,
315    N: NetworkPrimitives,
316{
317    fn disconnect(
318        &mut self,
319        reason: DisconnectReason,
320    ) -> Pin<Box<dyn Future<Output = Result<(), EthStreamError>> + Send + '_>> {
321        Box::pin(async move { self.inner.disconnect(reason).await.map_err(Into::into) })
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use super::UnauthedEthStream;
328    use crate::{
329        broadcast::BlockHashNumber,
330        errors::{EthHandshakeError, EthStreamError},
331        ethstream::RawCapabilityMessage,
332        hello::DEFAULT_TCP_PORT,
333        p2pstream::UnauthedP2PStream,
334        EthMessage, EthStream, EthVersion, HelloMessageWithProtocols, PassthroughCodec,
335        ProtocolVersion, Status, StatusMessage,
336    };
337    use alloy_chains::NamedChain;
338    use alloy_primitives::{bytes::Bytes, B256, U256};
339    use alloy_rlp::Decodable;
340    use futures::{SinkExt, StreamExt};
341    use reth_ecies::stream::ECIESStream;
342    use reth_eth_wire_types::{EthNetworkPrimitives, UnifiedStatus};
343    use reth_ethereum_forks::{ForkFilter, Head};
344    use reth_network_peers::pk2id;
345    use secp256k1::{SecretKey, SECP256K1};
346    use std::time::Duration;
347    use tokio::net::{TcpListener, TcpStream};
348    use tokio_util::codec::Decoder;
349
350    #[tokio::test]
351    async fn can_handshake() {
352        let genesis = B256::random();
353        let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
354
355        let status = Status {
356            version: EthVersion::Eth67,
357            chain: NamedChain::Mainnet.into(),
358            total_difficulty: U256::ZERO,
359            blockhash: B256::random(),
360            genesis,
361            // Pass the current fork id.
362            forkid: fork_filter.current(),
363        };
364        let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
365
366        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
367        let local_addr = listener.local_addr().unwrap();
368
369        let status_clone = unified_status;
370        let fork_filter_clone = fork_filter.clone();
371        let handle = tokio::spawn(async move {
372            // roughly based off of the design of tokio::net::TcpListener
373            let (incoming, _) = listener.accept().await.unwrap();
374            let stream = PassthroughCodec::default().framed(incoming);
375            let (_, their_status) = UnauthedEthStream::new(stream)
376                .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
377                .await
378                .unwrap();
379
380            // just make sure it equals our status (our status is a clone of their status)
381            assert_eq!(their_status, status_clone);
382        });
383
384        let outgoing = TcpStream::connect(local_addr).await.unwrap();
385        let sink = PassthroughCodec::default().framed(outgoing);
386
387        // try to connect
388        let (_, their_status) = UnauthedEthStream::new(sink)
389            .handshake::<EthNetworkPrimitives>(unified_status, fork_filter)
390            .await
391            .unwrap();
392
393        // their status is a clone of our status, these should be equal
394        assert_eq!(their_status, unified_status);
395
396        // wait for it to finish
397        handle.await.unwrap();
398    }
399
400    #[tokio::test]
401    async fn pass_handshake_on_low_td_bitlen() {
402        let genesis = B256::random();
403        let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
404
405        let status = Status {
406            version: EthVersion::Eth67,
407            chain: NamedChain::Mainnet.into(),
408            total_difficulty: U256::from(2).pow(U256::from(100)) - U256::from(1),
409            blockhash: B256::random(),
410            genesis,
411            // Pass the current fork id.
412            forkid: fork_filter.current(),
413        };
414        let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
415
416        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
417        let local_addr = listener.local_addr().unwrap();
418
419        let status_clone = unified_status;
420        let fork_filter_clone = fork_filter.clone();
421        let handle = tokio::spawn(async move {
422            // roughly based off of the design of tokio::net::TcpListener
423            let (incoming, _) = listener.accept().await.unwrap();
424            let stream = PassthroughCodec::default().framed(incoming);
425            let (_, their_status) = UnauthedEthStream::new(stream)
426                .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
427                .await
428                .unwrap();
429
430            // just make sure it equals our status, and that the handshake succeeded
431            assert_eq!(their_status, status_clone);
432        });
433
434        let outgoing = TcpStream::connect(local_addr).await.unwrap();
435        let sink = PassthroughCodec::default().framed(outgoing);
436
437        // try to connect
438        let (_, their_status) = UnauthedEthStream::new(sink)
439            .handshake::<EthNetworkPrimitives>(unified_status, fork_filter)
440            .await
441            .unwrap();
442
443        // their status is a clone of our status, these should be equal
444        assert_eq!(their_status, unified_status);
445
446        // await the other handshake
447        handle.await.unwrap();
448    }
449
450    #[tokio::test]
451    async fn fail_handshake_on_high_td_bitlen() {
452        let genesis = B256::random();
453        let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
454
455        let status = Status {
456            version: EthVersion::Eth67,
457            chain: NamedChain::Mainnet.into(),
458            total_difficulty: U256::from(2).pow(U256::from(164)),
459            blockhash: B256::random(),
460            genesis,
461            // Pass the current fork id.
462            forkid: fork_filter.current(),
463        };
464        let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
465
466        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
467        let local_addr = listener.local_addr().unwrap();
468
469        let status_clone = unified_status;
470        let fork_filter_clone = fork_filter.clone();
471        let handle = tokio::spawn(async move {
472            // roughly based off of the design of tokio::net::TcpListener
473            let (incoming, _) = listener.accept().await.unwrap();
474            let stream = PassthroughCodec::default().framed(incoming);
475            let handshake_res = UnauthedEthStream::new(stream)
476                .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
477                .await;
478
479            // make sure the handshake fails due to td too high
480            assert!(matches!(
481                handshake_res,
482                Err(EthStreamError::EthHandshakeError(
483                    EthHandshakeError::TotalDifficultyBitLenTooLarge { got: 165, maximum: 160 }
484                ))
485            ));
486        });
487
488        let outgoing = TcpStream::connect(local_addr).await.unwrap();
489        let sink = PassthroughCodec::default().framed(outgoing);
490
491        // try to connect
492        let handshake_res = UnauthedEthStream::new(sink)
493            .handshake::<EthNetworkPrimitives>(unified_status, fork_filter)
494            .await;
495
496        // this handshake should also fail due to td too high
497        assert!(matches!(
498            handshake_res,
499            Err(EthStreamError::EthHandshakeError(
500                EthHandshakeError::TotalDifficultyBitLenTooLarge { got: 165, maximum: 160 }
501            ))
502        ));
503
504        // await the other handshake
505        handle.await.unwrap();
506    }
507
508    #[tokio::test]
509    async fn can_write_and_read_cleartext() {
510        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
511        let local_addr = listener.local_addr().unwrap();
512        let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
513            vec![
514                BlockHashNumber { hash: B256::random(), number: 5 },
515                BlockHashNumber { hash: B256::random(), number: 6 },
516            ]
517            .into(),
518        );
519
520        let test_msg_clone = test_msg.clone();
521        let handle = tokio::spawn(async move {
522            // roughly based off of the design of tokio::net::TcpListener
523            let (incoming, _) = listener.accept().await.unwrap();
524            let stream = PassthroughCodec::default().framed(incoming);
525            let mut stream = EthStream::new(EthVersion::Eth67, stream);
526
527            // use the stream to get the next message
528            let message = stream.next().await.unwrap().unwrap();
529            assert_eq!(message, test_msg_clone);
530        });
531
532        let outgoing = TcpStream::connect(local_addr).await.unwrap();
533        let sink = PassthroughCodec::default().framed(outgoing);
534        let mut client_stream = EthStream::new(EthVersion::Eth67, sink);
535
536        client_stream.send(test_msg).await.unwrap();
537
538        // make sure the server receives the message and asserts before ending the test
539        handle.await.unwrap();
540    }
541
542    #[tokio::test]
543    async fn can_write_and_read_ecies() {
544        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
545        let local_addr = listener.local_addr().unwrap();
546        let server_key = SecretKey::new(&mut rand_08::thread_rng());
547        let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
548            vec![
549                BlockHashNumber { hash: B256::random(), number: 5 },
550                BlockHashNumber { hash: B256::random(), number: 6 },
551            ]
552            .into(),
553        );
554
555        let test_msg_clone = test_msg.clone();
556        let handle = tokio::spawn(async move {
557            // roughly based off of the design of tokio::net::TcpListener
558            let (incoming, _) = listener.accept().await.unwrap();
559            let stream = ECIESStream::incoming(incoming, server_key).await.unwrap();
560            let mut stream = EthStream::new(EthVersion::Eth67, stream);
561
562            // use the stream to get the next message
563            let message = stream.next().await.unwrap().unwrap();
564            assert_eq!(message, test_msg_clone);
565        });
566
567        // create the server pubkey
568        let server_id = pk2id(&server_key.public_key(SECP256K1));
569
570        let client_key = SecretKey::new(&mut rand_08::thread_rng());
571
572        let outgoing = TcpStream::connect(local_addr).await.unwrap();
573        let outgoing = ECIESStream::connect(outgoing, client_key, server_id).await.unwrap();
574        let mut client_stream = EthStream::new(EthVersion::Eth67, outgoing);
575
576        client_stream.send(test_msg).await.unwrap();
577
578        // make sure the server receives the message and asserts before ending the test
579        handle.await.unwrap();
580    }
581
582    #[tokio::test(flavor = "multi_thread")]
583    async fn ethstream_over_p2p() {
584        // create a p2p stream and server, then confirm that the two are authed
585        // create tcpstream
586        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
587        let local_addr = listener.local_addr().unwrap();
588        let server_key = SecretKey::new(&mut rand_08::thread_rng());
589        let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
590            vec![
591                BlockHashNumber { hash: B256::random(), number: 5 },
592                BlockHashNumber { hash: B256::random(), number: 6 },
593            ]
594            .into(),
595        );
596
597        let genesis = B256::random();
598        let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
599
600        let status = Status {
601            version: EthVersion::Eth67,
602            chain: NamedChain::Mainnet.into(),
603            total_difficulty: U256::ZERO,
604            blockhash: B256::random(),
605            genesis,
606            // Pass the current fork id.
607            forkid: fork_filter.current(),
608        };
609        let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
610
611        let status_copy = unified_status;
612        let fork_filter_clone = fork_filter.clone();
613        let test_msg_clone = test_msg.clone();
614        let handle = tokio::spawn(async move {
615            // roughly based off of the design of tokio::net::TcpListener
616            let (incoming, _) = listener.accept().await.unwrap();
617            let stream = ECIESStream::incoming(incoming, server_key).await.unwrap();
618
619            let server_hello = HelloMessageWithProtocols {
620                protocol_version: ProtocolVersion::V5,
621                client_version: "bitcoind/1.0.0".to_string(),
622                protocols: vec![EthVersion::Eth67.into()],
623                port: DEFAULT_TCP_PORT,
624                id: pk2id(&server_key.public_key(SECP256K1)),
625            };
626
627            let unauthed_stream = UnauthedP2PStream::new(stream);
628            let (p2p_stream, _) = unauthed_stream.handshake(server_hello).await.unwrap();
629            let (mut eth_stream, _) = UnauthedEthStream::new(p2p_stream)
630                .handshake(status_copy, fork_filter_clone)
631                .await
632                .unwrap();
633
634            // use the stream to get the next message
635            let message = eth_stream.next().await.unwrap().unwrap();
636            assert_eq!(message, test_msg_clone);
637        });
638
639        // create the server pubkey
640        let server_id = pk2id(&server_key.public_key(SECP256K1));
641
642        let client_key = SecretKey::new(&mut rand_08::thread_rng());
643
644        let outgoing = TcpStream::connect(local_addr).await.unwrap();
645        let sink = ECIESStream::connect(outgoing, client_key, server_id).await.unwrap();
646
647        let client_hello = HelloMessageWithProtocols {
648            protocol_version: ProtocolVersion::V5,
649            client_version: "bitcoind/1.0.0".to_string(),
650            protocols: vec![EthVersion::Eth67.into()],
651            port: DEFAULT_TCP_PORT,
652            id: pk2id(&client_key.public_key(SECP256K1)),
653        };
654
655        let unauthed_stream = UnauthedP2PStream::new(sink);
656        let (p2p_stream, _) = unauthed_stream.handshake(client_hello).await.unwrap();
657
658        let (mut client_stream, _) = UnauthedEthStream::new(p2p_stream)
659            .handshake(unified_status, fork_filter)
660            .await
661            .unwrap();
662
663        client_stream.send(test_msg).await.unwrap();
664
665        // make sure the server receives the message and asserts before ending the test
666        handle.await.unwrap();
667    }
668
669    #[tokio::test]
670    async fn handshake_should_timeout() {
671        let genesis = B256::random();
672        let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
673
674        let status = Status {
675            version: EthVersion::Eth67,
676            chain: NamedChain::Mainnet.into(),
677            total_difficulty: U256::ZERO,
678            blockhash: B256::random(),
679            genesis,
680            // Pass the current fork id.
681            forkid: fork_filter.current(),
682        };
683        let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
684
685        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
686        let local_addr = listener.local_addr().unwrap();
687
688        let status_clone = unified_status;
689        let fork_filter_clone = fork_filter.clone();
690        let _handle = tokio::spawn(async move {
691            // Delay accepting the connection for longer than the client's timeout period
692            tokio::time::sleep(Duration::from_secs(11)).await;
693            // roughly based off of the design of tokio::net::TcpListener
694            let (incoming, _) = listener.accept().await.unwrap();
695            let stream = PassthroughCodec::default().framed(incoming);
696            let (_, their_status) = UnauthedEthStream::new(stream)
697                .handshake::<EthNetworkPrimitives>(status_clone, fork_filter_clone)
698                .await
699                .unwrap();
700
701            // just make sure it equals our status (our status is a clone of their status)
702            assert_eq!(their_status, status_clone);
703        });
704
705        let outgoing = TcpStream::connect(local_addr).await.unwrap();
706        let sink = PassthroughCodec::default().framed(outgoing);
707
708        // try to connect
709        let handshake_result = UnauthedEthStream::new(sink)
710            .handshake_with_timeout::<EthNetworkPrimitives>(
711                unified_status,
712                fork_filter,
713                Duration::from_secs(1),
714            )
715            .await;
716
717        // Assert that a timeout error occurred
718        assert!(
719            matches!(handshake_result, Err(e) if e.to_string() == EthStreamError::StreamTimeout.to_string())
720        );
721    }
722
723    #[tokio::test]
724    async fn can_write_and_read_raw_capability() {
725        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
726        let local_addr = listener.local_addr().unwrap();
727
728        let test_msg = RawCapabilityMessage { id: 0x1234, payload: Bytes::from(vec![1, 2, 3, 4]) };
729
730        let test_msg_clone = test_msg.clone();
731        let handle = tokio::spawn(async move {
732            let (incoming, _) = listener.accept().await.unwrap();
733            let stream = PassthroughCodec::default().framed(incoming);
734            let mut stream = EthStream::<_, EthNetworkPrimitives>::new(EthVersion::Eth67, stream);
735
736            let bytes = stream.inner_mut().next().await.unwrap().unwrap();
737
738            // Create a cursor to track position while decoding
739            let mut id_bytes = &bytes[..];
740            let decoded_id = <usize as Decodable>::decode(&mut id_bytes).unwrap();
741            assert_eq!(decoded_id, test_msg_clone.id);
742
743            // Get remaining bytes after ID decoding
744            let remaining = id_bytes;
745            assert_eq!(remaining, &test_msg_clone.payload[..]);
746        });
747
748        let outgoing = TcpStream::connect(local_addr).await.unwrap();
749        let sink = PassthroughCodec::default().framed(outgoing);
750        let mut client_stream = EthStream::<_, EthNetworkPrimitives>::new(EthVersion::Eth67, sink);
751
752        client_stream.start_send_raw(test_msg).unwrap();
753        client_stream.inner_mut().flush().await.unwrap();
754
755        handle.await.unwrap();
756    }
757}