1use crate::{
4 cache::LruCache,
5 discovery::Discovery,
6 fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
7 message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult},
8 peers::{PeerAction, PeersManager},
9 session::BlockRangeInfo,
10 FetchClient,
11};
12use alloy_consensus::BlockHeader;
13use alloy_primitives::B256;
14use rand::seq::SliceRandom;
15use reth_eth_wire::{
16 BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, GetReceipts70,
17 NetworkPrimitives, NewBlockHashes, NewBlockPayload, UnifiedStatus,
18};
19use reth_ethereum_forks::ForkId;
20use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender};
21use reth_network_p2p::receipts::client::ReceiptsResponse;
22use reth_network_peers::PeerId;
23use reth_network_types::{PeerAddr, PeerKind};
24use reth_primitives_traits::Block;
25use std::{
26 collections::{HashMap, VecDeque},
27 fmt,
28 net::{IpAddr, SocketAddr},
29 ops::Deref,
30 sync::{
31 atomic::{AtomicU64, AtomicUsize},
32 Arc,
33 },
34 task::{Context, Poll},
35};
36use tokio::sync::oneshot;
37use tracing::{debug, trace};
38
39const PEER_BLOCK_CACHE_LIMIT: u32 = 512;
41
42pub(crate) struct BlockNumReader(Box<dyn reth_storage_api::BlockNumReader>);
44
45impl BlockNumReader {
46 pub fn new(reader: impl reth_storage_api::BlockNumReader + 'static) -> Self {
48 Self(Box::new(reader))
49 }
50}
51
52impl fmt::Debug for BlockNumReader {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct("BlockNumReader").field("inner", &"<dyn BlockNumReader>").finish()
55 }
56}
57
58impl Deref for BlockNumReader {
59 type Target = Box<dyn reth_storage_api::BlockNumReader>;
60
61 fn deref(&self) -> &Self::Target {
62 &self.0
63 }
64}
65
66#[derive(Debug)]
77pub struct NetworkState<N: NetworkPrimitives = EthNetworkPrimitives> {
78 active_peers: HashMap<PeerId, ActivePeer<N>>,
80 peers_manager: PeersManager,
82 queued_messages: VecDeque<StateAction<N>>,
84 client: BlockNumReader,
89 discovery: Discovery,
91 state_fetcher: StateFetcher<N>,
97}
98
99impl<N: NetworkPrimitives> NetworkState<N> {
100 pub(crate) fn new(
102 client: BlockNumReader,
103 discovery: Discovery,
104 peers_manager: PeersManager,
105 num_active_peers: Arc<AtomicUsize>,
106 ) -> Self {
107 let state_fetcher = StateFetcher::new(peers_manager.handle(), num_active_peers);
108 Self {
109 active_peers: Default::default(),
110 peers_manager,
111 queued_messages: Default::default(),
112 client,
113 discovery,
114 state_fetcher,
115 }
116 }
117
118 pub(crate) const fn peers_mut(&mut self) -> &mut PeersManager {
120 &mut self.peers_manager
121 }
122
123 pub(crate) const fn discovery_mut(&mut self) -> &mut Discovery {
125 &mut self.discovery
126 }
127
128 pub(crate) const fn peers(&self) -> &PeersManager {
130 &self.peers_manager
131 }
132
133 pub(crate) fn fetch_client(&self) -> FetchClient<N> {
135 self.state_fetcher.client()
136 }
137
138 pub fn num_active_peers(&self) -> usize {
140 self.active_peers.len()
141 }
142
143 pub(crate) fn on_session_activated(
148 &mut self,
149 peer: PeerId,
150 capabilities: Arc<Capabilities>,
151 status: Arc<UnifiedStatus>,
152 request_tx: PeerRequestSender<PeerRequest<N>>,
153 timeout: Arc<AtomicU64>,
154 range_info: Option<BlockRangeInfo>,
155 ) {
156 debug_assert!(!self.active_peers.contains_key(&peer), "Already connected; not possible");
157
158 let block_number =
160 self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default();
161 self.state_fetcher.new_active_peer(
162 peer,
163 status.blockhash,
164 block_number,
165 Arc::clone(&capabilities),
166 timeout,
167 range_info,
168 );
169
170 self.active_peers.insert(
171 peer,
172 ActivePeer {
173 best_hash: status.blockhash,
174 capabilities,
175 request_tx,
176 pending_response: None,
177 blocks: LruCache::new(PEER_BLOCK_CACHE_LIMIT),
178 },
179 );
180 }
181
182 pub(crate) fn on_session_closed(&mut self, peer: PeerId) {
186 self.active_peers.remove(&peer);
187 self.state_fetcher.on_session_closed(&peer);
188 }
189
190 pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
199 let num_propagate = (self.active_peers.len() as f64).sqrt() as u64 + 1;
202
203 let number = msg.block.block().header().number();
204 let mut count = 0;
205
206 let mut peers: Vec<_> = self.active_peers.iter_mut().collect();
208 peers.shuffle(&mut rand::rng());
209
210 for (peer_id, peer) in peers {
211 if peer.blocks.contains(&msg.hash) {
212 continue
214 }
215
216 if count < num_propagate {
218 self.queued_messages
219 .push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() });
220
221 if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
223 peer.best_hash = msg.hash;
224 }
225
226 peer.blocks.insert(msg.hash);
228
229 count += 1;
230 }
231
232 if count >= num_propagate {
233 break
234 }
235 }
236 }
237
238 pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
241 let number = msg.block.block().header().number();
242 let hashes = NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]);
243 for (peer_id, peer) in &mut self.active_peers {
244 if peer.blocks.contains(&msg.hash) {
245 continue
247 }
248
249 if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
250 peer.best_hash = msg.hash;
251 }
252
253 self.queued_messages.push_back(StateAction::NewBlockHashes {
254 peer_id: *peer_id,
255 hashes: hashes.clone(),
256 });
257 }
258 }
259
260 pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) {
262 if let Some(peer) = self.active_peers.get_mut(peer_id) {
263 peer.best_hash = hash;
264 }
265 self.state_fetcher.update_peer_block(peer_id, hash, number);
266 }
267
268 pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
270 self.discovery.update_fork_id(fork_id)
271 }
272
273 pub(crate) fn on_new_block(&mut self, peer_id: PeerId, hash: B256) {
277 if let Some(peer) = self.active_peers.get_mut(&peer_id) {
279 peer.blocks.insert(hash);
280 }
281 }
282
283 pub(crate) fn on_new_block_hashes(&mut self, peer_id: PeerId, hashes: Vec<BlockHashNumber>) {
285 if let Some(peer) = self.active_peers.get_mut(&peer_id) {
287 peer.blocks.extend(hashes.into_iter().map(|b| b.hash));
288 }
289 }
290
291 pub(crate) fn ban_ip_discovery(&self, ip: IpAddr) {
293 trace!(target: "net", ?ip, "Banning discovery");
294 self.discovery.ban_ip(ip)
295 }
296
297 pub(crate) fn ban_discovery(&self, peer_id: PeerId, ip: IpAddr) {
299 trace!(target: "net", ?peer_id, ?ip, "Banning discovery");
300 self.discovery.ban(peer_id, ip)
301 }
302
303 pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
305 self.peers_manager.add_trusted_peer_id(peer_id)
306 }
307
308 pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
310 self.peers_manager.add_peer_kind(peer_id, Some(kind), addr, None)
311 }
312
313 pub(crate) fn add_and_connect(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
315 self.peers_manager.add_and_connect_kind(peer_id, kind, addr, None)
316 }
317
318 pub(crate) fn remove_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind) {
320 match kind {
321 PeerKind::Basic | PeerKind::Static => self.peers_manager.remove_peer(peer_id),
322 PeerKind::Trusted => self.peers_manager.remove_peer_from_trusted_set(peer_id),
323 }
324 }
325
326 fn on_discovery_event(&mut self, event: DiscoveryEvent) {
328 match event {
329 DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id, addr, fork_id }) => {
330 self.queued_messages.push_back(StateAction::DiscoveredNode {
331 peer_id,
332 addr,
333 fork_id,
334 });
335 }
336 DiscoveryEvent::EnrForkId(record, fork_id) => {
337 let peer_id = record.id;
338 let tcp_addr = record.tcp_addr();
339 if tcp_addr.port() == 0 {
340 return
341 }
342 let udp_addr = record.udp_addr();
343 let addr = PeerAddr::new(tcp_addr, Some(udp_addr));
344 self.queued_messages.push_back(StateAction::DiscoveredEnrForkId {
345 peer_id,
346 addr,
347 fork_id,
348 });
349 }
350 }
351 }
352
353 fn on_peer_action(&mut self, action: PeerAction) {
355 match action {
356 PeerAction::Connect { peer_id, remote_addr } => {
357 self.queued_messages.push_back(StateAction::Connect { peer_id, remote_addr });
358 }
359 PeerAction::Disconnect { peer_id, reason } => {
360 self.state_fetcher.on_pending_disconnect(&peer_id);
361 self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason });
362 }
363 PeerAction::DisconnectBannedIncoming { peer_id } |
364 PeerAction::DisconnectUntrustedIncoming { peer_id } => {
365 self.state_fetcher.on_pending_disconnect(&peer_id);
366 self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason: None });
367 }
368 PeerAction::DiscoveryBanPeerId { peer_id, ip_addr } => {
369 self.ban_discovery(peer_id, ip_addr)
370 }
371 PeerAction::DiscoveryBanIp { ip_addr } => self.ban_ip_discovery(ip_addr),
372 PeerAction::PeerAdded(peer_id) => {
373 self.queued_messages.push_back(StateAction::PeerAdded(peer_id))
374 }
375 PeerAction::PeerRemoved(peer_id) => {
376 self.queued_messages.push_back(StateAction::PeerRemoved(peer_id))
377 }
378 PeerAction::BanPeer { .. } | PeerAction::UnBanPeer { .. } => {}
379 }
380 }
381
382 fn handle_block_request(&mut self, peer_id: PeerId, request: BlockRequest) {
387 if let Some(ref mut peer) = self.active_peers.get_mut(&peer_id) {
388 let (request, response) = match request {
389 BlockRequest::GetBlockHeaders(request) => {
390 let (response, rx) = oneshot::channel();
391 let request = PeerRequest::GetBlockHeaders { request, response };
392 let response = PeerResponse::BlockHeaders { response: rx };
393 (request, response)
394 }
395 BlockRequest::GetBlockBodies(request) => {
396 let (response, rx) = oneshot::channel();
397 let request = PeerRequest::GetBlockBodies { request, response };
398 let response = PeerResponse::BlockBodies { response: rx };
399 (request, response)
400 }
401 BlockRequest::GetReceipts(request) => {
402 if peer.capabilities.supports_eth_v70() {
403 let (response, rx) = oneshot::channel();
404 let request = PeerRequest::GetReceipts70 {
405 request: GetReceipts70 {
406 first_block_receipt_index: 0,
407 block_hashes: request.0,
408 },
409 response,
410 };
411 let response = PeerResponse::Receipts70 { response: rx };
412 (request, response)
413 } else if peer.capabilities.supports_eth_v69() {
414 let (response, rx) = oneshot::channel();
415 let request = PeerRequest::GetReceipts69 { request, response };
416 let response = PeerResponse::Receipts69 { response: rx };
417 (request, response)
418 } else {
419 let (response, rx) = oneshot::channel();
420 let request = PeerRequest::GetReceipts { request, response };
421 let response = PeerResponse::Receipts { response: rx };
422 (request, response)
423 }
424 }
425 };
426 let _ = peer.request_tx.to_session_tx.try_send(request);
427 peer.pending_response = Some(response);
428 }
429 }
430
431 fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) {
433 match outcome {
434 BlockResponseOutcome::Request(peer, request) => {
435 self.handle_block_request(peer, request);
436 }
437 BlockResponseOutcome::BadResponse(peer, reputation_change) => {
438 self.peers_manager.apply_reputation_change(&peer, reputation_change);
439 }
440 }
441 }
442
443 fn on_eth_response(&mut self, peer: PeerId, resp: PeerResponseResult<N>) {
449 let outcome = match resp {
450 PeerResponseResult::BlockHeaders(res) => {
451 self.state_fetcher.on_block_headers_response(peer, res)
452 }
453 PeerResponseResult::BlockBodies(res) => {
454 self.state_fetcher.on_block_bodies_response(peer, res)
455 }
456 PeerResponseResult::Receipts(res) => {
457 let normalized = res.map(|blocks| {
459 let receipts = blocks
460 .into_iter()
461 .map(|block_receipts| {
462 block_receipts.into_iter().map(|rwb| rwb.receipt).collect()
463 })
464 .collect();
465 ReceiptsResponse::new(receipts)
466 });
467 self.state_fetcher.on_receipts_response(peer, normalized)
468 }
469 PeerResponseResult::Receipts69(res) => {
470 let normalized = res.map(ReceiptsResponse::new);
471 self.state_fetcher.on_receipts_response(peer, normalized)
472 }
473 PeerResponseResult::Receipts70(res) => {
474 let normalized = res.map(ReceiptsResponse::from);
475 self.state_fetcher.on_receipts_response(peer, normalized)
476 }
477 _ => None,
478 };
479
480 if let Some(outcome) = outcome {
481 self.on_block_response_outcome(outcome);
482 }
483 }
484
485 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<StateAction<N>> {
487 loop {
488 if let Some(message) = self.queued_messages.pop_front() {
490 return Poll::Ready(message)
491 }
492
493 while let Poll::Ready(discovery) = self.discovery.poll(cx) {
494 self.on_discovery_event(discovery);
495 }
496
497 while let Poll::Ready(action) = self.state_fetcher.poll(cx) {
498 match action {
499 FetchAction::BlockRequest { peer_id, request } => {
500 self.handle_block_request(peer_id, request)
501 }
502 }
503 }
504
505 loop {
506 let mut closed_sessions = Vec::new();
508 let mut received_responses = Vec::new();
509
510 for (id, peer) in &mut self.active_peers {
512 let Some(mut response) = peer.pending_response.take() else { continue };
513 match response.poll(cx) {
514 Poll::Ready(res) => {
515 if res.err().is_some_and(|err| err.is_channel_closed()) {
517 debug!(
518 target: "net",
519 ?id,
520 "Request canceled, response channel from session closed."
521 );
522 closed_sessions.push(*id);
528 } else {
529 received_responses.push((*id, res));
530 }
531 }
532 Poll::Pending => {
533 peer.pending_response = Some(response);
535 }
536 };
537 }
538
539 for peer in closed_sessions {
540 self.on_session_closed(peer)
541 }
542
543 if received_responses.is_empty() {
544 break;
545 }
546
547 for (peer_id, resp) in received_responses {
548 self.on_eth_response(peer_id, resp);
549 }
550 }
551
552 while let Poll::Ready(action) = self.peers_manager.poll(cx) {
554 self.on_peer_action(action);
555 }
556
557 if self.queued_messages.is_empty() {
560 return Poll::Pending
561 }
562 }
563 }
564}
565
566#[derive(Debug)]
570pub(crate) struct ActivePeer<N: NetworkPrimitives> {
571 pub(crate) best_hash: B256,
573 pub(crate) capabilities: Arc<Capabilities>,
575 pub(crate) request_tx: PeerRequestSender<PeerRequest<N>>,
577 pub(crate) pending_response: Option<PeerResponse<N>>,
579 pub(crate) blocks: LruCache<B256>,
581}
582
583#[derive(Debug)]
585pub(crate) enum StateAction<N: NetworkPrimitives> {
586 NewBlock {
588 peer_id: PeerId,
590 block: NewBlockMessage<N::NewBlockPayload>,
592 },
593 NewBlockHashes {
594 peer_id: PeerId,
596 hashes: NewBlockHashes,
598 },
599 Connect { remote_addr: SocketAddr, peer_id: PeerId },
601 Disconnect {
603 peer_id: PeerId,
604 reason: Option<DisconnectReason>,
606 },
607 DiscoveredEnrForkId {
609 peer_id: PeerId,
610 addr: PeerAddr,
612 fork_id: ForkId,
614 },
615 DiscoveredNode { peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId> },
617 PeerAdded(PeerId),
619 PeerRemoved(PeerId),
621}
622
623#[cfg(test)]
624mod tests {
625 use crate::{
626 discovery::Discovery,
627 fetch::StateFetcher,
628 peers::PeersManager,
629 state::{BlockNumReader, NetworkState},
630 PeerRequest,
631 };
632 use alloy_consensus::Header;
633 use alloy_primitives::B256;
634 use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthNetworkPrimitives, EthVersion};
635 use reth_ethereum_primitives::BlockBody;
636 use reth_network_api::PeerRequestSender;
637 use reth_network_p2p::{bodies::client::BodiesClient, error::RequestError};
638 use reth_network_peers::PeerId;
639 use reth_storage_api::noop::NoopProvider;
640 use std::{
641 future::poll_fn,
642 sync::{atomic::AtomicU64, Arc},
643 };
644 use tokio::sync::mpsc;
645 use tokio_stream::{wrappers::ReceiverStream, StreamExt};
646
647 fn state() -> NetworkState<EthNetworkPrimitives> {
649 let peers = PeersManager::default();
650 let handle = peers.handle();
651 NetworkState {
652 active_peers: Default::default(),
653 peers_manager: Default::default(),
654 queued_messages: Default::default(),
655 client: BlockNumReader(Box::new(NoopProvider::default())),
656 discovery: Discovery::noop(),
657 state_fetcher: StateFetcher::new(handle, Default::default()),
658 }
659 }
660
661 fn capabilities() -> Arc<Capabilities> {
662 Arc::new(vec![Capability::from(EthVersion::Eth67)].into())
663 }
664
665 #[tokio::test(flavor = "multi_thread")]
668 async fn test_dropped_active_session() {
669 let mut state = state();
670 let client = state.fetch_client();
671
672 let peer_id = PeerId::random();
673 let (tx, session_rx) = mpsc::channel(1);
674 let peer_tx = PeerRequestSender::new(peer_id, tx);
675
676 state.on_session_activated(
677 peer_id,
678 capabilities(),
679 Arc::default(),
680 peer_tx,
681 Arc::new(AtomicU64::new(1)),
682 None,
683 );
684
685 assert!(state.active_peers.contains_key(&peer_id));
686
687 let body = BlockBody { ommers: vec![Header::default()], ..Default::default() };
688
689 let body_response = body.clone();
690
691 tokio::task::spawn(async move {
693 let mut stream = ReceiverStream::new(session_rx);
694 let resp = stream.next().await.unwrap();
695 match resp {
696 PeerRequest::GetBlockBodies { response, .. } => {
697 response.send(Ok(BlockBodies(vec![body_response]))).unwrap();
698 }
699 _ => unreachable!(),
700 }
701
702 let _resp = stream.next().await.unwrap();
704 });
705
706 tokio::task::spawn(async move {
708 loop {
709 poll_fn(|cx| state.poll(cx)).await;
710 }
711 });
712
713 let (peer, bodies) = client.get_block_bodies(vec![B256::random()]).await.unwrap().split();
715 assert_eq!(peer, peer_id);
716 assert_eq!(bodies, vec![body]);
717
718 let resp = client.get_block_bodies(vec![B256::random()]).await;
719 assert!(resp.is_err());
720 assert_eq!(resp.unwrap_err(), RequestError::ConnectionDropped);
721 }
722}