1use std::{
11 collections::VecDeque,
12 fmt,
13 future::Future,
14 io,
15 pin::{pin, Pin},
16 task::{ready, Context, Poll},
17};
18
19use crate::{
20 capability::{SharedCapabilities, SharedCapability, UnsupportedCapabilityError},
21 errors::{EthStreamError, P2PStreamError},
22 p2pstream::DisconnectP2P,
23 CanDisconnect, Capability, DisconnectReason, EthStream, P2PStream, Status, UnauthedEthStream,
24};
25use bytes::{Bytes, BytesMut};
26use futures::{Sink, SinkExt, Stream, StreamExt, TryStream, TryStreamExt};
27use reth_eth_wire_types::NetworkPrimitives;
28use reth_ethereum_forks::ForkFilter;
29use tokio::sync::{mpsc, mpsc::UnboundedSender};
30use tokio_stream::wrappers::UnboundedReceiverStream;
31
32#[derive(Debug)]
35pub struct RlpxProtocolMultiplexer<St> {
36 inner: MultiplexInner<St>,
37}
38
39impl<St> RlpxProtocolMultiplexer<St> {
40 pub fn new(conn: P2PStream<St>) -> Self {
42 Self {
43 inner: MultiplexInner {
44 conn,
45 protocols: Default::default(),
46 out_buffer: Default::default(),
47 },
48 }
49 }
50
51 pub fn install_protocol<F, Proto>(
56 &mut self,
57 cap: &Capability,
58 f: F,
59 ) -> Result<(), UnsupportedCapabilityError>
60 where
61 F: FnOnce(ProtocolConnection) -> Proto,
62 Proto: Stream<Item = BytesMut> + Send + 'static,
63 {
64 self.inner.install_protocol(cap, f)
65 }
66
67 pub const fn shared_capabilities(&self) -> &SharedCapabilities {
69 self.inner.shared_capabilities()
70 }
71
72 pub fn into_satellite_stream<F, Primary>(
74 self,
75 cap: &Capability,
76 primary: F,
77 ) -> Result<RlpxSatelliteStream<St, Primary>, P2PStreamError>
78 where
79 F: FnOnce(ProtocolProxy) -> Primary,
80 {
81 let Ok(shared_cap) = self.shared_capabilities().ensure_matching_capability(cap).cloned()
82 else {
83 return Err(P2PStreamError::CapabilityNotShared)
84 };
85
86 let (to_primary, from_wire) = mpsc::unbounded_channel();
87 let (to_wire, from_primary) = mpsc::unbounded_channel();
88 let proxy = ProtocolProxy {
89 shared_cap: shared_cap.clone(),
90 from_wire: UnboundedReceiverStream::new(from_wire),
91 to_wire,
92 };
93
94 let st = primary(proxy);
95 Ok(RlpxSatelliteStream {
96 inner: self.inner,
97 primary: PrimaryProtocol {
98 to_primary,
99 from_primary: UnboundedReceiverStream::new(from_primary),
100 st,
101 shared_cap,
102 },
103 })
104 }
105
106 pub async fn into_satellite_stream_with_handshake<F, Fut, Err, Primary>(
111 self,
112 cap: &Capability,
113 handshake: F,
114 ) -> Result<RlpxSatelliteStream<St, Primary>, Err>
115 where
116 F: FnOnce(ProtocolProxy) -> Fut,
117 Fut: Future<Output = Result<Primary, Err>>,
118 St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
119 P2PStreamError: Into<Err>,
120 {
121 self.into_satellite_stream_with_tuple_handshake(cap, move |proxy| async move {
122 let st = handshake(proxy).await?;
123 Ok((st, ()))
124 })
125 .await
126 .map(|(st, _)| st)
127 }
128
129 pub async fn into_satellite_stream_with_tuple_handshake<F, Fut, Err, Primary, Extra>(
139 mut self,
140 cap: &Capability,
141 handshake: F,
142 ) -> Result<(RlpxSatelliteStream<St, Primary>, Extra), Err>
143 where
144 F: FnOnce(ProtocolProxy) -> Fut,
145 Fut: Future<Output = Result<(Primary, Extra), Err>>,
146 St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
147 P2PStreamError: Into<Err>,
148 {
149 let Ok(shared_cap) = self.shared_capabilities().ensure_matching_capability(cap).cloned()
150 else {
151 return Err(P2PStreamError::CapabilityNotShared.into())
152 };
153
154 let (to_primary, from_wire) = mpsc::unbounded_channel();
155 let (to_wire, mut from_primary) = mpsc::unbounded_channel();
156 let proxy = ProtocolProxy {
157 shared_cap: shared_cap.clone(),
158 from_wire: UnboundedReceiverStream::new(from_wire),
159 to_wire,
160 };
161
162 let f = handshake(proxy);
163 let mut f = pin!(f);
164
165 loop {
168 tokio::select! {
169 Some(Ok(msg)) = self.inner.conn.next() => {
170 let Some(offset) = msg.first().copied()
172 else {
173 return Err(P2PStreamError::EmptyProtocolMessage.into())
174 };
175 if let Some(cap) = self.shared_capabilities().find_by_relative_offset(offset).cloned() {
176 if cap == shared_cap {
177 let _ = to_primary.send(msg);
179 } else {
180 self.inner.delegate_message(&cap, msg);
182 }
183 } else {
184 return Err(P2PStreamError::UnknownReservedMessageId(offset).into())
185 }
186 }
187 Some(msg) = from_primary.recv() => {
188 self.inner.conn.send(msg).await.map_err(Into::into)?;
189 }
190 res = &mut f => {
191 let (st, extra) = res?;
192 return Ok((RlpxSatelliteStream {
193 inner: self.inner,
194 primary: PrimaryProtocol {
195 to_primary,
196 from_primary: UnboundedReceiverStream::new(from_primary),
197 st,
198 shared_cap,
199 }
200 }, extra))
201 }
202 }
203 }
204 }
205
206 pub async fn into_eth_satellite_stream<N: NetworkPrimitives>(
209 self,
210 status: Status,
211 fork_filter: ForkFilter,
212 ) -> Result<(RlpxSatelliteStream<St, EthStream<ProtocolProxy, N>>, Status), EthStreamError>
213 where
214 St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
215 {
216 let eth_cap = self.inner.conn.shared_capabilities().eth_version()?;
217 self.into_satellite_stream_with_tuple_handshake(
218 &Capability::eth(eth_cap),
219 move |proxy| async move {
220 UnauthedEthStream::new(proxy).handshake(status, fork_filter).await
221 },
222 )
223 .await
224 }
225}
226
227#[derive(Debug)]
228struct MultiplexInner<St> {
229 conn: P2PStream<St>,
231 protocols: Vec<ProtocolStream>,
233 out_buffer: VecDeque<Bytes>,
235}
236
237impl<St> MultiplexInner<St> {
238 const fn shared_capabilities(&self) -> &SharedCapabilities {
239 self.conn.shared_capabilities()
240 }
241
242 fn delegate_message(&self, cap: &SharedCapability, msg: BytesMut) -> bool {
244 for proto in &self.protocols {
245 if proto.shared_cap == *cap {
246 proto.send_raw(msg);
247 return true
248 }
249 }
250 false
251 }
252
253 fn install_protocol<F, Proto>(
254 &mut self,
255 cap: &Capability,
256 f: F,
257 ) -> Result<(), UnsupportedCapabilityError>
258 where
259 F: FnOnce(ProtocolConnection) -> Proto,
260 Proto: Stream<Item = BytesMut> + Send + 'static,
261 {
262 let shared_cap =
263 self.conn.shared_capabilities().ensure_matching_capability(cap).cloned()?;
264 let (to_satellite, rx) = mpsc::unbounded_channel();
265 let proto_conn = ProtocolConnection { from_wire: UnboundedReceiverStream::new(rx) };
266 let st = f(proto_conn);
267 let st = ProtocolStream { shared_cap, to_satellite, satellite_st: Box::pin(st) };
268 self.protocols.push(st);
269 Ok(())
270 }
271}
272
273#[derive(Debug)]
275struct PrimaryProtocol<Primary> {
276 to_primary: UnboundedSender<BytesMut>,
278 from_primary: UnboundedReceiverStream<Bytes>,
280 shared_cap: SharedCapability,
282 st: Primary,
284}
285
286#[derive(Debug)]
290pub struct ProtocolProxy {
291 shared_cap: SharedCapability,
292 from_wire: UnboundedReceiverStream<BytesMut>,
294 to_wire: UnboundedSender<Bytes>,
296}
297
298impl ProtocolProxy {
299 fn try_send(&self, msg: Bytes) -> Result<(), io::Error> {
301 if msg.is_empty() {
302 return Err(io::ErrorKind::InvalidInput.into())
304 }
305 self.to_wire.send(self.mask_msg_id(msg)?).map_err(|_| io::ErrorKind::BrokenPipe.into())
306 }
307
308 #[inline]
310 fn mask_msg_id(&self, msg: Bytes) -> Result<Bytes, io::Error> {
311 if msg.is_empty() {
312 return Err(io::ErrorKind::InvalidInput.into())
314 }
315
316 let offset = self.shared_cap.relative_message_id_offset();
317 if offset == 0 {
318 return Ok(msg);
319 }
320
321 let mut masked = Vec::from(msg);
322 masked[0] = masked[0].checked_add(offset).ok_or(io::ErrorKind::InvalidInput)?;
323 Ok(masked.into())
324 }
325
326 #[inline]
328 fn unmask_id(&self, mut msg: BytesMut) -> Result<BytesMut, io::Error> {
329 if msg.is_empty() {
330 return Err(io::ErrorKind::InvalidInput.into())
332 }
333 msg[0] = msg[0]
334 .checked_sub(self.shared_cap.relative_message_id_offset())
335 .ok_or(io::ErrorKind::InvalidInput)?;
336 Ok(msg)
337 }
338}
339
340impl Stream for ProtocolProxy {
341 type Item = Result<BytesMut, io::Error>;
342
343 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
344 let msg = ready!(self.from_wire.poll_next_unpin(cx));
345 Poll::Ready(msg.map(|msg| self.get_mut().unmask_id(msg)))
346 }
347}
348
349impl Sink<Bytes> for ProtocolProxy {
350 type Error = io::Error;
351
352 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
353 Poll::Ready(Ok(()))
354 }
355
356 fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
357 self.get_mut().try_send(item)
358 }
359
360 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
361 Poll::Ready(Ok(()))
362 }
363
364 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
365 Poll::Ready(Ok(()))
366 }
367}
368
369impl CanDisconnect<Bytes> for ProtocolProxy {
370 fn disconnect(
371 &mut self,
372 _reason: DisconnectReason,
373 ) -> Pin<Box<dyn Future<Output = Result<(), <Self as Sink<Bytes>>::Error>> + Send + '_>> {
374 Box::pin(async move { Ok(()) })
376 }
377}
378
379#[derive(Debug)]
383pub struct ProtocolConnection {
384 from_wire: UnboundedReceiverStream<BytesMut>,
385}
386
387impl Stream for ProtocolConnection {
388 type Item = BytesMut;
389
390 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
391 self.from_wire.poll_next_unpin(cx)
392 }
393}
394
395#[derive(Debug)]
398pub struct RlpxSatelliteStream<St, Primary> {
399 inner: MultiplexInner<St>,
400 primary: PrimaryProtocol<Primary>,
401}
402
403impl<St, Primary> RlpxSatelliteStream<St, Primary> {
404 pub fn install_protocol<F, Proto>(
409 &mut self,
410 cap: &Capability,
411 f: F,
412 ) -> Result<(), UnsupportedCapabilityError>
413 where
414 F: FnOnce(ProtocolConnection) -> Proto,
415 Proto: Stream<Item = BytesMut> + Send + 'static,
416 {
417 self.inner.install_protocol(cap, f)
418 }
419
420 #[inline]
422 pub const fn primary(&self) -> &Primary {
423 &self.primary.st
424 }
425
426 #[inline]
428 pub const fn primary_mut(&mut self) -> &mut Primary {
429 &mut self.primary.st
430 }
431
432 #[inline]
434 pub const fn inner(&self) -> &P2PStream<St> {
435 &self.inner.conn
436 }
437
438 #[inline]
440 pub const fn inner_mut(&mut self) -> &mut P2PStream<St> {
441 &mut self.inner.conn
442 }
443
444 #[inline]
446 pub fn into_inner(self) -> P2PStream<St> {
447 self.inner.conn
448 }
449}
450
451impl<St, Primary, PrimaryErr> Stream for RlpxSatelliteStream<St, Primary>
452where
453 St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
454 Primary: TryStream<Error = PrimaryErr> + Unpin,
455 P2PStreamError: Into<PrimaryErr>,
456{
457 type Item = Result<Primary::Ok, Primary::Error>;
458
459 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
460 let this = self.get_mut();
461
462 loop {
463 if let Poll::Ready(Some(msg)) = this.primary.st.try_poll_next_unpin(cx) {
465 return Poll::Ready(Some(msg))
466 }
467
468 let mut conn_ready = true;
469 loop {
470 match this.inner.conn.poll_ready_unpin(cx) {
471 Poll::Ready(Ok(())) => {
472 if let Some(msg) = this.inner.out_buffer.pop_front() {
473 if let Err(err) = this.inner.conn.start_send_unpin(msg) {
474 return Poll::Ready(Some(Err(err.into())))
475 }
476 } else {
477 break
478 }
479 }
480 Poll::Ready(Err(err)) => {
481 if let Err(disconnect_err) =
482 this.inner.conn.start_disconnect(DisconnectReason::DisconnectRequested)
483 {
484 return Poll::Ready(Some(Err(disconnect_err.into())))
485 }
486 return Poll::Ready(Some(Err(err.into())))
487 }
488 Poll::Pending => {
489 conn_ready = false;
490 break
491 }
492 }
493 }
494
495 loop {
497 match this.primary.from_primary.poll_next_unpin(cx) {
498 Poll::Ready(Some(msg)) => {
499 this.inner.out_buffer.push_back(msg);
500 }
501 Poll::Ready(None) => {
502 return Poll::Ready(None)
504 }
505 Poll::Pending => break,
506 }
507 }
508
509 for idx in (0..this.inner.protocols.len()).rev() {
511 let mut proto = this.inner.protocols.swap_remove(idx);
512 loop {
513 match proto.poll_next_unpin(cx) {
514 Poll::Ready(Some(Err(err))) => {
515 return Poll::Ready(Some(Err(P2PStreamError::Io(err).into())))
516 }
517 Poll::Ready(Some(Ok(msg))) => {
518 this.inner.out_buffer.push_back(msg);
519 }
520 Poll::Ready(None) => return Poll::Ready(None),
521 Poll::Pending => {
522 this.inner.protocols.push(proto);
523 break
524 }
525 }
526 }
527 }
528
529 let mut delegated = false;
530 loop {
531 match this.inner.conn.poll_next_unpin(cx) {
533 Poll::Ready(Some(Ok(msg))) => {
534 delegated = true;
535 let Some(offset) = msg.first().copied() else {
536 return Poll::Ready(Some(Err(
537 P2PStreamError::EmptyProtocolMessage.into()
538 )))
539 };
540 if let Some(cap) =
542 this.inner.conn.shared_capabilities().find_by_relative_offset(offset)
543 {
544 if cap == &this.primary.shared_cap {
545 let _ = this.primary.to_primary.send(msg);
547 } else {
548 for proto in &this.inner.protocols {
550 if proto.shared_cap == *cap {
551 proto.send_raw(msg);
552 break
553 }
554 }
555 }
556 } else {
557 return Poll::Ready(Some(Err(P2PStreamError::UnknownReservedMessageId(
558 offset,
559 )
560 .into())))
561 }
562 }
563 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
564 Poll::Ready(None) => {
565 return Poll::Ready(None)
567 }
568 Poll::Pending => break,
569 }
570 }
571
572 if !conn_ready || (!delegated && this.inner.out_buffer.is_empty()) {
573 return Poll::Pending
574 }
575 }
576 }
577}
578
579impl<St, Primary, T> Sink<T> for RlpxSatelliteStream<St, Primary>
580where
581 St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
582 Primary: Sink<T> + Unpin,
583 P2PStreamError: Into<<Primary as Sink<T>>::Error>,
584{
585 type Error = <Primary as Sink<T>>::Error;
586
587 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
588 let this = self.get_mut();
589 if let Err(err) = ready!(this.inner.conn.poll_ready_unpin(cx)) {
590 return Poll::Ready(Err(err.into()))
591 }
592 if let Err(err) = ready!(this.primary.st.poll_ready_unpin(cx)) {
593 return Poll::Ready(Err(err))
594 }
595 Poll::Ready(Ok(()))
596 }
597
598 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
599 self.get_mut().primary.st.start_send_unpin(item)
600 }
601
602 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
603 self.get_mut().inner.conn.poll_flush_unpin(cx).map_err(Into::into)
604 }
605
606 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
607 self.get_mut().inner.conn.poll_close_unpin(cx).map_err(Into::into)
608 }
609}
610
611struct ProtocolStream {
613 shared_cap: SharedCapability,
614 to_satellite: UnboundedSender<BytesMut>,
616 satellite_st: Pin<Box<dyn Stream<Item = BytesMut> + Send>>,
617}
618
619impl ProtocolStream {
620 #[inline]
622 fn mask_msg_id(&self, mut msg: BytesMut) -> Result<Bytes, io::Error> {
623 if msg.is_empty() {
624 return Err(io::ErrorKind::InvalidInput.into())
626 }
627 msg[0] = msg[0]
628 .checked_add(self.shared_cap.relative_message_id_offset())
629 .ok_or(io::ErrorKind::InvalidInput)?;
630 Ok(msg.freeze())
631 }
632
633 #[inline]
635 fn unmask_id(&self, mut msg: BytesMut) -> Result<BytesMut, io::Error> {
636 if msg.is_empty() {
637 return Err(io::ErrorKind::InvalidInput.into())
639 }
640 msg[0] = msg[0]
641 .checked_sub(self.shared_cap.relative_message_id_offset())
642 .ok_or(io::ErrorKind::InvalidInput)?;
643 Ok(msg)
644 }
645
646 fn send_raw(&self, msg: BytesMut) {
648 let _ = self.unmask_id(msg).map(|msg| self.to_satellite.send(msg));
649 }
650}
651
652impl Stream for ProtocolStream {
653 type Item = Result<Bytes, io::Error>;
654
655 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
656 let this = self.get_mut();
657 let msg = ready!(this.satellite_st.as_mut().poll_next(cx));
658 Poll::Ready(msg.map(|msg| this.mask_msg_id(msg)))
659 }
660}
661
662impl fmt::Debug for ProtocolStream {
663 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
664 f.debug_struct("ProtocolStream").field("cap", &self.shared_cap).finish_non_exhaustive()
665 }
666}
667
668#[cfg(test)]
669mod tests {
670 use super::*;
671 use crate::{
672 test_utils::{
673 connect_passthrough, eth_handshake, eth_hello,
674 proto::{test_hello, TestProtoMessage},
675 },
676 UnauthedP2PStream,
677 };
678 use reth_eth_wire_types::EthNetworkPrimitives;
679 use tokio::{net::TcpListener, sync::oneshot};
680 use tokio_util::codec::Decoder;
681
682 #[tokio::test]
683 async fn eth_satellite() {
684 reth_tracing::init_test_tracing();
685 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
686 let local_addr = listener.local_addr().unwrap();
687 let (status, fork_filter) = eth_handshake();
688 let other_status = status;
689 let other_fork_filter = fork_filter.clone();
690 let _handle = tokio::spawn(async move {
691 let (incoming, _) = listener.accept().await.unwrap();
692 let stream = crate::PassthroughCodec::default().framed(incoming);
693 let (server_hello, _) = eth_hello();
694 let (p2p_stream, _) =
695 UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap();
696
697 let (_eth_stream, _) = UnauthedEthStream::new(p2p_stream)
698 .handshake::<EthNetworkPrimitives>(other_status, other_fork_filter)
699 .await
700 .unwrap();
701
702 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
703 });
704
705 let conn = connect_passthrough(local_addr, eth_hello().0).await;
706 let eth = conn.shared_capabilities().eth().unwrap().clone();
707
708 let multiplexer = RlpxProtocolMultiplexer::new(conn);
709 let _satellite = multiplexer
710 .into_satellite_stream_with_handshake(
711 eth.capability().as_ref(),
712 move |proxy| async move {
713 UnauthedEthStream::new(proxy)
714 .handshake::<EthNetworkPrimitives>(status, fork_filter)
715 .await
716 },
717 )
718 .await
719 .unwrap();
720 }
721
722 #[tokio::test(flavor = "multi_thread")]
724 async fn eth_test_protocol_satellite() {
725 reth_tracing::init_test_tracing();
726 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
727 let local_addr = listener.local_addr().unwrap();
728 let (status, fork_filter) = eth_handshake();
729 let other_status = status;
730 let other_fork_filter = fork_filter.clone();
731 let _handle = tokio::spawn(async move {
732 let (incoming, _) = listener.accept().await.unwrap();
733 let stream = crate::PassthroughCodec::default().framed(incoming);
734 let (server_hello, _) = test_hello();
735 let (conn, _) = UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap();
736
737 let (mut st, _their_status) = RlpxProtocolMultiplexer::new(conn)
738 .into_eth_satellite_stream::<EthNetworkPrimitives>(other_status, other_fork_filter)
739 .await
740 .unwrap();
741
742 st.install_protocol(&TestProtoMessage::capability(), |mut conn| {
743 async_stream::stream! {
744 yield TestProtoMessage::ping().encoded();
745 let msg = conn.next().await.unwrap();
746 let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
747 assert_eq!(msg, TestProtoMessage::pong());
748
749 yield TestProtoMessage::message("hello").encoded();
750 let msg = conn.next().await.unwrap();
751 let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
752 assert_eq!(msg, TestProtoMessage::message("good bye!"));
753
754 yield TestProtoMessage::message("good bye!").encoded();
755
756 futures::future::pending::<()>().await;
757 unreachable!()
758 }
759 })
760 .unwrap();
761
762 loop {
763 let _ = st.next().await;
764 }
765 });
766
767 let conn = connect_passthrough(local_addr, test_hello().0).await;
768 let (mut st, _their_status) = RlpxProtocolMultiplexer::new(conn)
769 .into_eth_satellite_stream::<EthNetworkPrimitives>(status, fork_filter)
770 .await
771 .unwrap();
772
773 let (tx, mut rx) = oneshot::channel();
774
775 st.install_protocol(&TestProtoMessage::capability(), |mut conn| {
776 async_stream::stream! {
777 let msg = conn.next().await.unwrap();
778 let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
779 assert_eq!(msg, TestProtoMessage::ping());
780
781 yield TestProtoMessage::pong().encoded();
782
783 let msg = conn.next().await.unwrap();
784 let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
785 assert_eq!(msg, TestProtoMessage::message("hello"));
786
787 yield TestProtoMessage::message("good bye!").encoded();
788
789 let msg = conn.next().await.unwrap();
790 let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
791 assert_eq!(msg, TestProtoMessage::message("good bye!"));
792
793 tx.send(()).unwrap();
794
795 futures::future::pending::<()>().await;
796 unreachable!()
797 }
798 })
799 .unwrap();
800
801 loop {
802 tokio::select! {
803 _ = &mut rx => {
804 break
805 }
806 _ = st.next() => {
807 }
808 }
809 }
810 }
811}