1use crate::{
4 cache::LruCache,
5 discovery::Discovery,
6 fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
7 message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult},
8 peers::{PeerAction, PeersManager},
9 session::BlockRangeInfo,
10 FetchClient,
11};
12use alloy_consensus::BlockHeader;
13use alloy_primitives::B256;
14use rand::seq::SliceRandom;
15use reth_eth_wire::{
16 BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives,
17 NewBlockHashes, NewBlockPayload, UnifiedStatus,
18};
19use reth_ethereum_forks::ForkId;
20use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender};
21use reth_network_peers::PeerId;
22use reth_network_types::{PeerAddr, PeerKind};
23use reth_primitives_traits::Block;
24use std::{
25 collections::{HashMap, VecDeque},
26 fmt,
27 net::{IpAddr, SocketAddr},
28 ops::Deref,
29 sync::{
30 atomic::{AtomicU64, AtomicUsize},
31 Arc,
32 },
33 task::{Context, Poll},
34};
35use tokio::sync::oneshot;
36use tracing::{debug, trace};
37
38const PEER_BLOCK_CACHE_LIMIT: u32 = 512;
40
41pub(crate) struct BlockNumReader(Box<dyn reth_storage_api::BlockNumReader>);
43
44impl BlockNumReader {
45 pub fn new(reader: impl reth_storage_api::BlockNumReader + 'static) -> Self {
47 Self(Box::new(reader))
48 }
49}
50
51impl fmt::Debug for BlockNumReader {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 f.debug_struct("BlockNumReader").field("inner", &"<dyn BlockNumReader>").finish()
54 }
55}
56
57impl Deref for BlockNumReader {
58 type Target = Box<dyn reth_storage_api::BlockNumReader>;
59
60 fn deref(&self) -> &Self::Target {
61 &self.0
62 }
63}
64
65#[derive(Debug)]
76pub struct NetworkState<N: NetworkPrimitives = EthNetworkPrimitives> {
77 active_peers: HashMap<PeerId, ActivePeer<N>>,
79 peers_manager: PeersManager,
81 queued_messages: VecDeque<StateAction<N>>,
83 client: BlockNumReader,
88 discovery: Discovery,
90 state_fetcher: StateFetcher<N>,
96}
97
98impl<N: NetworkPrimitives> NetworkState<N> {
99 pub(crate) fn new(
101 client: BlockNumReader,
102 discovery: Discovery,
103 peers_manager: PeersManager,
104 num_active_peers: Arc<AtomicUsize>,
105 ) -> Self {
106 let state_fetcher = StateFetcher::new(peers_manager.handle(), num_active_peers);
107 Self {
108 active_peers: Default::default(),
109 peers_manager,
110 queued_messages: Default::default(),
111 client,
112 discovery,
113 state_fetcher,
114 }
115 }
116
117 pub(crate) const fn peers_mut(&mut self) -> &mut PeersManager {
119 &mut self.peers_manager
120 }
121
122 pub(crate) const fn discovery_mut(&mut self) -> &mut Discovery {
124 &mut self.discovery
125 }
126
127 pub(crate) const fn peers(&self) -> &PeersManager {
129 &self.peers_manager
130 }
131
132 pub(crate) fn fetch_client(&self) -> FetchClient<N> {
134 self.state_fetcher.client()
135 }
136
137 pub fn num_active_peers(&self) -> usize {
139 self.active_peers.len()
140 }
141
142 pub(crate) fn on_session_activated(
147 &mut self,
148 peer: PeerId,
149 capabilities: Arc<Capabilities>,
150 status: Arc<UnifiedStatus>,
151 request_tx: PeerRequestSender<PeerRequest<N>>,
152 timeout: Arc<AtomicU64>,
153 range_info: Option<BlockRangeInfo>,
154 ) {
155 debug_assert!(!self.active_peers.contains_key(&peer), "Already connected; not possible");
156
157 let block_number =
159 self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default();
160 self.state_fetcher.new_active_peer(
161 peer,
162 status.blockhash,
163 block_number,
164 timeout,
165 range_info,
166 );
167
168 self.active_peers.insert(
169 peer,
170 ActivePeer {
171 best_hash: status.blockhash,
172 capabilities,
173 request_tx,
174 pending_response: None,
175 blocks: LruCache::new(PEER_BLOCK_CACHE_LIMIT),
176 },
177 );
178 }
179
180 pub(crate) fn on_session_closed(&mut self, peer: PeerId) {
184 self.active_peers.remove(&peer);
185 self.state_fetcher.on_session_closed(&peer);
186 }
187
188 pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
197 let num_propagate = (self.active_peers.len() as f64).sqrt() as u64 + 1;
200
201 let number = msg.block.block().header().number();
202 let mut count = 0;
203
204 let mut peers: Vec<_> = self.active_peers.iter_mut().collect();
206 peers.shuffle(&mut rand::rng());
207
208 for (peer_id, peer) in peers {
209 if peer.blocks.contains(&msg.hash) {
210 continue
212 }
213
214 if count < num_propagate {
216 self.queued_messages
217 .push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() });
218
219 if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
221 peer.best_hash = msg.hash;
222 }
223
224 peer.blocks.insert(msg.hash);
226
227 count += 1;
228 }
229
230 if count >= num_propagate {
231 break
232 }
233 }
234 }
235
236 pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
239 let number = msg.block.block().header().number();
240 let hashes = NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]);
241 for (peer_id, peer) in &mut self.active_peers {
242 if peer.blocks.contains(&msg.hash) {
243 continue
245 }
246
247 if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
248 peer.best_hash = msg.hash;
249 }
250
251 self.queued_messages.push_back(StateAction::NewBlockHashes {
252 peer_id: *peer_id,
253 hashes: hashes.clone(),
254 });
255 }
256 }
257
258 pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) {
260 if let Some(peer) = self.active_peers.get_mut(peer_id) {
261 peer.best_hash = hash;
262 }
263 self.state_fetcher.update_peer_block(peer_id, hash, number);
264 }
265
266 pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
268 self.discovery.update_fork_id(fork_id)
269 }
270
271 pub(crate) fn on_new_block(&mut self, peer_id: PeerId, hash: B256) {
275 if let Some(peer) = self.active_peers.get_mut(&peer_id) {
277 peer.blocks.insert(hash);
278 }
279 }
280
281 pub(crate) fn on_new_block_hashes(&mut self, peer_id: PeerId, hashes: Vec<BlockHashNumber>) {
283 if let Some(peer) = self.active_peers.get_mut(&peer_id) {
285 peer.blocks.extend(hashes.into_iter().map(|b| b.hash));
286 }
287 }
288
289 pub(crate) fn ban_ip_discovery(&self, ip: IpAddr) {
291 trace!(target: "net", ?ip, "Banning discovery");
292 self.discovery.ban_ip(ip)
293 }
294
295 pub(crate) fn ban_discovery(&self, peer_id: PeerId, ip: IpAddr) {
297 trace!(target: "net", ?peer_id, ?ip, "Banning discovery");
298 self.discovery.ban(peer_id, ip)
299 }
300
301 pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
303 self.peers_manager.add_trusted_peer_id(peer_id)
304 }
305
306 pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
308 self.peers_manager.add_peer_kind(peer_id, kind, addr, None)
309 }
310
311 pub(crate) fn add_and_connect(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
313 self.peers_manager.add_and_connect_kind(peer_id, kind, addr, None)
314 }
315
316 pub(crate) fn remove_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind) {
318 match kind {
319 PeerKind::Basic | PeerKind::Static => self.peers_manager.remove_peer(peer_id),
320 PeerKind::Trusted => self.peers_manager.remove_peer_from_trusted_set(peer_id),
321 }
322 }
323
324 fn on_discovery_event(&mut self, event: DiscoveryEvent) {
326 match event {
327 DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id, addr, fork_id }) => {
328 self.queued_messages.push_back(StateAction::DiscoveredNode {
329 peer_id,
330 addr,
331 fork_id,
332 });
333 }
334 DiscoveryEvent::EnrForkId(peer_id, fork_id) => {
335 self.queued_messages
336 .push_back(StateAction::DiscoveredEnrForkId { peer_id, fork_id });
337 }
338 }
339 }
340
341 fn on_peer_action(&mut self, action: PeerAction) {
343 match action {
344 PeerAction::Connect { peer_id, remote_addr } => {
345 self.queued_messages.push_back(StateAction::Connect { peer_id, remote_addr });
346 }
347 PeerAction::Disconnect { peer_id, reason } => {
348 self.state_fetcher.on_pending_disconnect(&peer_id);
349 self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason });
350 }
351 PeerAction::DisconnectBannedIncoming { peer_id } |
352 PeerAction::DisconnectUntrustedIncoming { peer_id } => {
353 self.state_fetcher.on_pending_disconnect(&peer_id);
354 self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason: None });
355 }
356 PeerAction::DiscoveryBanPeerId { peer_id, ip_addr } => {
357 self.ban_discovery(peer_id, ip_addr)
358 }
359 PeerAction::DiscoveryBanIp { ip_addr } => self.ban_ip_discovery(ip_addr),
360 PeerAction::PeerAdded(peer_id) => {
361 self.queued_messages.push_back(StateAction::PeerAdded(peer_id))
362 }
363 PeerAction::PeerRemoved(peer_id) => {
364 self.queued_messages.push_back(StateAction::PeerRemoved(peer_id))
365 }
366 PeerAction::BanPeer { .. } | PeerAction::UnBanPeer { .. } => {}
367 }
368 }
369
370 fn handle_block_request(&mut self, peer: PeerId, request: BlockRequest) {
375 if let Some(ref mut peer) = self.active_peers.get_mut(&peer) {
376 let (request, response) = match request {
377 BlockRequest::GetBlockHeaders(request) => {
378 let (response, rx) = oneshot::channel();
379 let request = PeerRequest::GetBlockHeaders { request, response };
380 let response = PeerResponse::BlockHeaders { response: rx };
381 (request, response)
382 }
383 BlockRequest::GetBlockBodies(request) => {
384 let (response, rx) = oneshot::channel();
385 let request = PeerRequest::GetBlockBodies { request, response };
386 let response = PeerResponse::BlockBodies { response: rx };
387 (request, response)
388 }
389 };
390 let _ = peer.request_tx.to_session_tx.try_send(request);
391 peer.pending_response = Some(response);
392 }
393 }
394
395 fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) {
397 match outcome {
398 BlockResponseOutcome::Request(peer, request) => {
399 self.handle_block_request(peer, request);
400 }
401 BlockResponseOutcome::BadResponse(peer, reputation_change) => {
402 self.peers_manager.apply_reputation_change(&peer, reputation_change);
403 }
404 }
405 }
406
407 fn on_eth_response(&mut self, peer: PeerId, resp: PeerResponseResult<N>) {
413 let outcome = match resp {
414 PeerResponseResult::BlockHeaders(res) => {
415 self.state_fetcher.on_block_headers_response(peer, res)
416 }
417 PeerResponseResult::BlockBodies(res) => {
418 self.state_fetcher.on_block_bodies_response(peer, res)
419 }
420 _ => None,
421 };
422
423 if let Some(outcome) = outcome {
424 self.on_block_response_outcome(outcome);
425 }
426 }
427
428 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<StateAction<N>> {
430 loop {
431 if let Some(message) = self.queued_messages.pop_front() {
433 return Poll::Ready(message)
434 }
435
436 while let Poll::Ready(discovery) = self.discovery.poll(cx) {
437 self.on_discovery_event(discovery);
438 }
439
440 while let Poll::Ready(action) = self.state_fetcher.poll(cx) {
441 match action {
442 FetchAction::BlockRequest { peer_id, request } => {
443 self.handle_block_request(peer_id, request)
444 }
445 }
446 }
447
448 loop {
449 let mut closed_sessions = Vec::new();
451 let mut received_responses = Vec::new();
452
453 for (id, peer) in &mut self.active_peers {
455 let Some(mut response) = peer.pending_response.take() else { continue };
456 match response.poll(cx) {
457 Poll::Ready(res) => {
458 if res.err().is_some_and(|err| err.is_channel_closed()) {
460 debug!(
461 target: "net",
462 ?id,
463 "Request canceled, response channel from session closed."
464 );
465 closed_sessions.push(*id);
471 } else {
472 received_responses.push((*id, res));
473 }
474 }
475 Poll::Pending => {
476 peer.pending_response = Some(response);
478 }
479 };
480 }
481
482 for peer in closed_sessions {
483 self.on_session_closed(peer)
484 }
485
486 if received_responses.is_empty() {
487 break;
488 }
489
490 for (peer_id, resp) in received_responses {
491 self.on_eth_response(peer_id, resp);
492 }
493 }
494
495 while let Poll::Ready(action) = self.peers_manager.poll(cx) {
497 self.on_peer_action(action);
498 }
499
500 if self.queued_messages.is_empty() {
503 return Poll::Pending
504 }
505 }
506 }
507}
508
509#[derive(Debug)]
513pub(crate) struct ActivePeer<N: NetworkPrimitives> {
514 pub(crate) best_hash: B256,
516 #[expect(dead_code)]
518 pub(crate) capabilities: Arc<Capabilities>,
519 pub(crate) request_tx: PeerRequestSender<PeerRequest<N>>,
521 pub(crate) pending_response: Option<PeerResponse<N>>,
523 pub(crate) blocks: LruCache<B256>,
525}
526
527#[derive(Debug)]
529pub(crate) enum StateAction<N: NetworkPrimitives> {
530 NewBlock {
532 peer_id: PeerId,
534 block: NewBlockMessage<N::NewBlockPayload>,
536 },
537 NewBlockHashes {
538 peer_id: PeerId,
540 hashes: NewBlockHashes,
542 },
543 Connect { remote_addr: SocketAddr, peer_id: PeerId },
545 Disconnect {
547 peer_id: PeerId,
548 reason: Option<DisconnectReason>,
550 },
551 DiscoveredEnrForkId {
553 peer_id: PeerId,
554 fork_id: ForkId,
556 },
557 DiscoveredNode { peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId> },
559 PeerAdded(PeerId),
561 PeerRemoved(PeerId),
563}
564
565#[cfg(test)]
566mod tests {
567 use crate::{
568 discovery::Discovery,
569 fetch::StateFetcher,
570 peers::PeersManager,
571 state::{BlockNumReader, NetworkState},
572 PeerRequest,
573 };
574 use alloy_consensus::Header;
575 use alloy_primitives::B256;
576 use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthNetworkPrimitives, EthVersion};
577 use reth_ethereum_primitives::BlockBody;
578 use reth_network_api::PeerRequestSender;
579 use reth_network_p2p::{bodies::client::BodiesClient, error::RequestError};
580 use reth_network_peers::PeerId;
581 use reth_storage_api::noop::NoopProvider;
582 use std::{
583 future::poll_fn,
584 sync::{atomic::AtomicU64, Arc},
585 };
586 use tokio::sync::mpsc;
587 use tokio_stream::{wrappers::ReceiverStream, StreamExt};
588
589 fn state() -> NetworkState<EthNetworkPrimitives> {
591 let peers = PeersManager::default();
592 let handle = peers.handle();
593 NetworkState {
594 active_peers: Default::default(),
595 peers_manager: Default::default(),
596 queued_messages: Default::default(),
597 client: BlockNumReader(Box::new(NoopProvider::default())),
598 discovery: Discovery::noop(),
599 state_fetcher: StateFetcher::new(handle, Default::default()),
600 }
601 }
602
603 fn capabilities() -> Arc<Capabilities> {
604 Arc::new(vec![Capability::from(EthVersion::Eth67)].into())
605 }
606
607 #[tokio::test(flavor = "multi_thread")]
610 async fn test_dropped_active_session() {
611 let mut state = state();
612 let client = state.fetch_client();
613
614 let peer_id = PeerId::random();
615 let (tx, session_rx) = mpsc::channel(1);
616 let peer_tx = PeerRequestSender::new(peer_id, tx);
617
618 state.on_session_activated(
619 peer_id,
620 capabilities(),
621 Arc::default(),
622 peer_tx,
623 Arc::new(AtomicU64::new(1)),
624 None,
625 );
626
627 assert!(state.active_peers.contains_key(&peer_id));
628
629 let body = BlockBody { ommers: vec![Header::default()], ..Default::default() };
630
631 let body_response = body.clone();
632
633 tokio::task::spawn(async move {
635 let mut stream = ReceiverStream::new(session_rx);
636 let resp = stream.next().await.unwrap();
637 match resp {
638 PeerRequest::GetBlockBodies { response, .. } => {
639 response.send(Ok(BlockBodies(vec![body_response]))).unwrap();
640 }
641 _ => unreachable!(),
642 }
643
644 let _resp = stream.next().await.unwrap();
646 });
647
648 tokio::task::spawn(async move {
650 loop {
651 poll_fn(|cx| state.poll(cx)).await;
652 }
653 });
654
655 let (peer, bodies) = client.get_block_bodies(vec![B256::random()]).await.unwrap().split();
657 assert_eq!(peer, peer_id);
658 assert_eq!(bodies, vec![body]);
659
660 let resp = client.get_block_bodies(vec![B256::random()]).await;
661 assert!(resp.is_err());
662 assert_eq!(resp.unwrap_err(), RequestError::ConnectionDropped);
663 }
664}