1use crate::{
4 cache::LruCache,
5 discovery::Discovery,
6 fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
7 message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult},
8 peers::{PeerAction, PeersManager},
9 session::BlockRangeInfo,
10 FetchClient,
11};
12use alloy_consensus::BlockHeader;
13use alloy_primitives::B256;
14use rand::seq::SliceRandom;
15use reth_eth_wire::{
16 BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives,
17 NewBlockHashes, NewBlockPayload, UnifiedStatus,
18};
19use reth_ethereum_forks::ForkId;
20use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender};
21use reth_network_peers::PeerId;
22use reth_network_types::{PeerAddr, PeerKind};
23use reth_primitives_traits::Block;
24use std::{
25 collections::{HashMap, VecDeque},
26 fmt,
27 net::{IpAddr, SocketAddr},
28 ops::Deref,
29 sync::{
30 atomic::{AtomicU64, AtomicUsize},
31 Arc,
32 },
33 task::{Context, Poll},
34};
35use tokio::sync::oneshot;
36use tracing::{debug, trace};
37
38const PEER_BLOCK_CACHE_LIMIT: u32 = 512;
40
41pub(crate) struct BlockNumReader(Box<dyn reth_storage_api::BlockNumReader>);
43
44impl BlockNumReader {
45 pub fn new(reader: impl reth_storage_api::BlockNumReader + 'static) -> Self {
47 Self(Box::new(reader))
48 }
49}
50
51impl fmt::Debug for BlockNumReader {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 f.debug_struct("BlockNumReader").field("inner", &"<dyn BlockNumReader>").finish()
54 }
55}
56
57impl Deref for BlockNumReader {
58 type Target = Box<dyn reth_storage_api::BlockNumReader>;
59
60 fn deref(&self) -> &Self::Target {
61 &self.0
62 }
63}
64
65#[derive(Debug)]
76pub struct NetworkState<N: NetworkPrimitives = EthNetworkPrimitives> {
77 active_peers: HashMap<PeerId, ActivePeer<N>>,
79 peers_manager: PeersManager,
81 queued_messages: VecDeque<StateAction<N>>,
83 client: BlockNumReader,
88 discovery: Discovery,
90 state_fetcher: StateFetcher<N>,
96}
97
98impl<N: NetworkPrimitives> NetworkState<N> {
99 pub(crate) fn new(
101 client: BlockNumReader,
102 discovery: Discovery,
103 peers_manager: PeersManager,
104 num_active_peers: Arc<AtomicUsize>,
105 ) -> Self {
106 let state_fetcher = StateFetcher::new(peers_manager.handle(), num_active_peers);
107 Self {
108 active_peers: Default::default(),
109 peers_manager,
110 queued_messages: Default::default(),
111 client,
112 discovery,
113 state_fetcher,
114 }
115 }
116
117 pub(crate) const fn peers_mut(&mut self) -> &mut PeersManager {
119 &mut self.peers_manager
120 }
121
122 pub(crate) const fn discovery_mut(&mut self) -> &mut Discovery {
124 &mut self.discovery
125 }
126
127 pub(crate) const fn peers(&self) -> &PeersManager {
129 &self.peers_manager
130 }
131
132 pub(crate) fn fetch_client(&self) -> FetchClient<N> {
134 self.state_fetcher.client()
135 }
136
137 pub fn num_active_peers(&self) -> usize {
139 self.active_peers.len()
140 }
141
142 pub(crate) fn on_session_activated(
147 &mut self,
148 peer: PeerId,
149 capabilities: Arc<Capabilities>,
150 status: Arc<UnifiedStatus>,
151 request_tx: PeerRequestSender<PeerRequest<N>>,
152 timeout: Arc<AtomicU64>,
153 range_info: Option<BlockRangeInfo>,
154 ) {
155 debug_assert!(!self.active_peers.contains_key(&peer), "Already connected; not possible");
156
157 let block_number =
159 self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default();
160 self.state_fetcher.new_active_peer(
161 peer,
162 status.blockhash,
163 block_number,
164 Arc::clone(&capabilities),
165 timeout,
166 range_info,
167 );
168
169 self.active_peers.insert(
170 peer,
171 ActivePeer {
172 best_hash: status.blockhash,
173 capabilities,
174 request_tx,
175 pending_response: None,
176 blocks: LruCache::new(PEER_BLOCK_CACHE_LIMIT),
177 },
178 );
179 }
180
181 pub(crate) fn on_session_closed(&mut self, peer: PeerId) {
185 self.active_peers.remove(&peer);
186 self.state_fetcher.on_session_closed(&peer);
187 }
188
189 pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
198 let num_propagate = (self.active_peers.len() as f64).sqrt() as u64 + 1;
201
202 let number = msg.block.block().header().number();
203 let mut count = 0;
204
205 let mut peers: Vec<_> = self.active_peers.iter_mut().collect();
207 peers.shuffle(&mut rand::rng());
208
209 for (peer_id, peer) in peers {
210 if peer.blocks.contains(&msg.hash) {
211 continue
213 }
214
215 if count < num_propagate {
217 self.queued_messages
218 .push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() });
219
220 if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
222 peer.best_hash = msg.hash;
223 }
224
225 peer.blocks.insert(msg.hash);
227
228 count += 1;
229 }
230
231 if count >= num_propagate {
232 break
233 }
234 }
235 }
236
237 pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
240 let number = msg.block.block().header().number();
241 let hashes = NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]);
242 for (peer_id, peer) in &mut self.active_peers {
243 if peer.blocks.contains(&msg.hash) {
244 continue
246 }
247
248 if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
249 peer.best_hash = msg.hash;
250 }
251
252 self.queued_messages.push_back(StateAction::NewBlockHashes {
253 peer_id: *peer_id,
254 hashes: hashes.clone(),
255 });
256 }
257 }
258
259 pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) {
261 if let Some(peer) = self.active_peers.get_mut(peer_id) {
262 peer.best_hash = hash;
263 }
264 self.state_fetcher.update_peer_block(peer_id, hash, number);
265 }
266
267 pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
269 self.discovery.update_fork_id(fork_id)
270 }
271
272 pub(crate) fn on_new_block(&mut self, peer_id: PeerId, hash: B256) {
276 if let Some(peer) = self.active_peers.get_mut(&peer_id) {
278 peer.blocks.insert(hash);
279 }
280 }
281
282 pub(crate) fn on_new_block_hashes(&mut self, peer_id: PeerId, hashes: Vec<BlockHashNumber>) {
284 if let Some(peer) = self.active_peers.get_mut(&peer_id) {
286 peer.blocks.extend(hashes.into_iter().map(|b| b.hash));
287 }
288 }
289
290 pub(crate) fn ban_ip_discovery(&self, ip: IpAddr) {
292 trace!(target: "net", ?ip, "Banning discovery");
293 self.discovery.ban_ip(ip)
294 }
295
296 pub(crate) fn ban_discovery(&self, peer_id: PeerId, ip: IpAddr) {
298 trace!(target: "net", ?peer_id, ?ip, "Banning discovery");
299 self.discovery.ban(peer_id, ip)
300 }
301
302 pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
304 self.peers_manager.add_trusted_peer_id(peer_id)
305 }
306
307 pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
309 self.peers_manager.add_peer_kind(peer_id, Some(kind), addr, None)
310 }
311
312 pub(crate) fn add_and_connect(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
314 self.peers_manager.add_and_connect_kind(peer_id, kind, addr, None)
315 }
316
317 pub(crate) fn remove_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind) {
319 match kind {
320 PeerKind::Basic | PeerKind::Static => self.peers_manager.remove_peer(peer_id),
321 PeerKind::Trusted => self.peers_manager.remove_peer_from_trusted_set(peer_id),
322 }
323 }
324
325 fn on_discovery_event(&mut self, event: DiscoveryEvent) {
327 match event {
328 DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id, addr, fork_id }) => {
329 self.queued_messages.push_back(StateAction::DiscoveredNode {
330 peer_id,
331 addr,
332 fork_id,
333 });
334 }
335 DiscoveryEvent::EnrForkId(record, fork_id) => {
336 let peer_id = record.id;
337 let tcp_addr = record.tcp_addr();
338 if tcp_addr.port() == 0 {
339 return
340 }
341 let udp_addr = record.udp_addr();
342 let addr = PeerAddr::new(tcp_addr, Some(udp_addr));
343 self.queued_messages.push_back(StateAction::DiscoveredEnrForkId {
344 peer_id,
345 addr,
346 fork_id,
347 });
348 }
349 }
350 }
351
352 fn on_peer_action(&mut self, action: PeerAction) {
354 match action {
355 PeerAction::Connect { peer_id, remote_addr } => {
356 self.queued_messages.push_back(StateAction::Connect { peer_id, remote_addr });
357 }
358 PeerAction::Disconnect { peer_id, reason } => {
359 self.state_fetcher.on_pending_disconnect(&peer_id);
360 self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason });
361 }
362 PeerAction::DisconnectBannedIncoming { peer_id } |
363 PeerAction::DisconnectUntrustedIncoming { peer_id } => {
364 self.state_fetcher.on_pending_disconnect(&peer_id);
365 self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason: None });
366 }
367 PeerAction::DiscoveryBanPeerId { peer_id, ip_addr } => {
368 self.ban_discovery(peer_id, ip_addr)
369 }
370 PeerAction::DiscoveryBanIp { ip_addr } => self.ban_ip_discovery(ip_addr),
371 PeerAction::PeerAdded(peer_id) => {
372 self.queued_messages.push_back(StateAction::PeerAdded(peer_id))
373 }
374 PeerAction::PeerRemoved(peer_id) => {
375 self.queued_messages.push_back(StateAction::PeerRemoved(peer_id))
376 }
377 PeerAction::BanPeer { .. } | PeerAction::UnBanPeer { .. } => {}
378 }
379 }
380
381 fn handle_block_request(&mut self, peer: PeerId, request: BlockRequest) {
386 if let Some(ref mut peer) = self.active_peers.get_mut(&peer) {
387 let (request, response) = match request {
388 BlockRequest::GetBlockHeaders(request) => {
389 let (response, rx) = oneshot::channel();
390 let request = PeerRequest::GetBlockHeaders { request, response };
391 let response = PeerResponse::BlockHeaders { response: rx };
392 (request, response)
393 }
394 BlockRequest::GetBlockBodies(request) => {
395 let (response, rx) = oneshot::channel();
396 let request = PeerRequest::GetBlockBodies { request, response };
397 let response = PeerResponse::BlockBodies { response: rx };
398 (request, response)
399 }
400 };
401 let _ = peer.request_tx.to_session_tx.try_send(request);
402 peer.pending_response = Some(response);
403 }
404 }
405
406 fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) {
408 match outcome {
409 BlockResponseOutcome::Request(peer, request) => {
410 self.handle_block_request(peer, request);
411 }
412 BlockResponseOutcome::BadResponse(peer, reputation_change) => {
413 self.peers_manager.apply_reputation_change(&peer, reputation_change);
414 }
415 }
416 }
417
418 fn on_eth_response(&mut self, peer: PeerId, resp: PeerResponseResult<N>) {
424 let outcome = match resp {
425 PeerResponseResult::BlockHeaders(res) => {
426 self.state_fetcher.on_block_headers_response(peer, res)
427 }
428 PeerResponseResult::BlockBodies(res) => {
429 self.state_fetcher.on_block_bodies_response(peer, res)
430 }
431 _ => None,
432 };
433
434 if let Some(outcome) = outcome {
435 self.on_block_response_outcome(outcome);
436 }
437 }
438
439 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<StateAction<N>> {
441 loop {
442 if let Some(message) = self.queued_messages.pop_front() {
444 return Poll::Ready(message)
445 }
446
447 while let Poll::Ready(discovery) = self.discovery.poll(cx) {
448 self.on_discovery_event(discovery);
449 }
450
451 while let Poll::Ready(action) = self.state_fetcher.poll(cx) {
452 match action {
453 FetchAction::BlockRequest { peer_id, request } => {
454 self.handle_block_request(peer_id, request)
455 }
456 }
457 }
458
459 loop {
460 let mut closed_sessions = Vec::new();
462 let mut received_responses = Vec::new();
463
464 for (id, peer) in &mut self.active_peers {
466 let Some(mut response) = peer.pending_response.take() else { continue };
467 match response.poll(cx) {
468 Poll::Ready(res) => {
469 if res.err().is_some_and(|err| err.is_channel_closed()) {
471 debug!(
472 target: "net",
473 ?id,
474 "Request canceled, response channel from session closed."
475 );
476 closed_sessions.push(*id);
482 } else {
483 received_responses.push((*id, res));
484 }
485 }
486 Poll::Pending => {
487 peer.pending_response = Some(response);
489 }
490 };
491 }
492
493 for peer in closed_sessions {
494 self.on_session_closed(peer)
495 }
496
497 if received_responses.is_empty() {
498 break;
499 }
500
501 for (peer_id, resp) in received_responses {
502 self.on_eth_response(peer_id, resp);
503 }
504 }
505
506 while let Poll::Ready(action) = self.peers_manager.poll(cx) {
508 self.on_peer_action(action);
509 }
510
511 if self.queued_messages.is_empty() {
514 return Poll::Pending
515 }
516 }
517 }
518}
519
520#[derive(Debug)]
524pub(crate) struct ActivePeer<N: NetworkPrimitives> {
525 pub(crate) best_hash: B256,
527 #[expect(dead_code)]
529 pub(crate) capabilities: Arc<Capabilities>,
530 pub(crate) request_tx: PeerRequestSender<PeerRequest<N>>,
532 pub(crate) pending_response: Option<PeerResponse<N>>,
534 pub(crate) blocks: LruCache<B256>,
536}
537
538#[derive(Debug)]
540pub(crate) enum StateAction<N: NetworkPrimitives> {
541 NewBlock {
543 peer_id: PeerId,
545 block: NewBlockMessage<N::NewBlockPayload>,
547 },
548 NewBlockHashes {
549 peer_id: PeerId,
551 hashes: NewBlockHashes,
553 },
554 Connect { remote_addr: SocketAddr, peer_id: PeerId },
556 Disconnect {
558 peer_id: PeerId,
559 reason: Option<DisconnectReason>,
561 },
562 DiscoveredEnrForkId {
564 peer_id: PeerId,
565 addr: PeerAddr,
567 fork_id: ForkId,
569 },
570 DiscoveredNode { peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId> },
572 PeerAdded(PeerId),
574 PeerRemoved(PeerId),
576}
577
578#[cfg(test)]
579mod tests {
580 use crate::{
581 discovery::Discovery,
582 fetch::StateFetcher,
583 peers::PeersManager,
584 state::{BlockNumReader, NetworkState},
585 PeerRequest,
586 };
587 use alloy_consensus::Header;
588 use alloy_primitives::B256;
589 use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthNetworkPrimitives, EthVersion};
590 use reth_ethereum_primitives::BlockBody;
591 use reth_network_api::PeerRequestSender;
592 use reth_network_p2p::{bodies::client::BodiesClient, error::RequestError};
593 use reth_network_peers::PeerId;
594 use reth_storage_api::noop::NoopProvider;
595 use std::{
596 future::poll_fn,
597 sync::{atomic::AtomicU64, Arc},
598 };
599 use tokio::sync::mpsc;
600 use tokio_stream::{wrappers::ReceiverStream, StreamExt};
601
602 fn state() -> NetworkState<EthNetworkPrimitives> {
604 let peers = PeersManager::default();
605 let handle = peers.handle();
606 NetworkState {
607 active_peers: Default::default(),
608 peers_manager: Default::default(),
609 queued_messages: Default::default(),
610 client: BlockNumReader(Box::new(NoopProvider::default())),
611 discovery: Discovery::noop(),
612 state_fetcher: StateFetcher::new(handle, Default::default()),
613 }
614 }
615
616 fn capabilities() -> Arc<Capabilities> {
617 Arc::new(vec![Capability::from(EthVersion::Eth67)].into())
618 }
619
620 #[tokio::test(flavor = "multi_thread")]
623 async fn test_dropped_active_session() {
624 let mut state = state();
625 let client = state.fetch_client();
626
627 let peer_id = PeerId::random();
628 let (tx, session_rx) = mpsc::channel(1);
629 let peer_tx = PeerRequestSender::new(peer_id, tx);
630
631 state.on_session_activated(
632 peer_id,
633 capabilities(),
634 Arc::default(),
635 peer_tx,
636 Arc::new(AtomicU64::new(1)),
637 None,
638 );
639
640 assert!(state.active_peers.contains_key(&peer_id));
641
642 let body = BlockBody { ommers: vec![Header::default()], ..Default::default() };
643
644 let body_response = body.clone();
645
646 tokio::task::spawn(async move {
648 let mut stream = ReceiverStream::new(session_rx);
649 let resp = stream.next().await.unwrap();
650 match resp {
651 PeerRequest::GetBlockBodies { response, .. } => {
652 response.send(Ok(BlockBodies(vec![body_response]))).unwrap();
653 }
654 _ => unreachable!(),
655 }
656
657 let _resp = stream.next().await.unwrap();
659 });
660
661 tokio::task::spawn(async move {
663 loop {
664 poll_fn(|cx| state.poll(cx)).await;
665 }
666 });
667
668 let (peer, bodies) = client.get_block_bodies(vec![B256::random()]).await.unwrap().split();
670 assert_eq!(peer, peer_id);
671 assert_eq!(bodies, vec![body]);
672
673 let resp = client.get_block_bodies(vec![B256::random()]).await;
674 assert!(resp.is_err());
675 assert_eq!(resp.unwrap_err(), RequestError::ConnectionDropped);
676 }
677}