1use crate::{
4 cache::LruCache,
5 discovery::Discovery,
6 fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
7 message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult},
8 peers::{PeerAction, PeersManager},
9 session::BlockRangeInfo,
10 FetchClient,
11};
12use alloy_consensus::BlockHeader;
13use alloy_primitives::B256;
14use rand::seq::SliceRandom;
15use reth_eth_wire::{
16 BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives,
17 NewBlockHashes, NewBlockPayload, UnifiedStatus,
18};
19use reth_ethereum_forks::ForkId;
20use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender};
21use reth_network_peers::PeerId;
22use reth_network_types::{PeerAddr, PeerKind};
23use reth_primitives_traits::Block;
24use std::{
25 collections::{HashMap, VecDeque},
26 fmt,
27 net::{IpAddr, SocketAddr},
28 ops::Deref,
29 sync::{
30 atomic::{AtomicU64, AtomicUsize},
31 Arc,
32 },
33 task::{Context, Poll},
34};
35use tokio::sync::oneshot;
36use tracing::{debug, trace};
37
38const PEER_BLOCK_CACHE_LIMIT: u32 = 512;
40
41pub(crate) struct BlockNumReader(Box<dyn reth_storage_api::BlockNumReader>);
43
44impl BlockNumReader {
45 pub fn new(reader: impl reth_storage_api::BlockNumReader + 'static) -> Self {
47 Self(Box::new(reader))
48 }
49}
50
51impl fmt::Debug for BlockNumReader {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 f.debug_struct("BlockNumReader").field("inner", &"<dyn BlockNumReader>").finish()
54 }
55}
56
57impl Deref for BlockNumReader {
58 type Target = Box<dyn reth_storage_api::BlockNumReader>;
59
60 fn deref(&self) -> &Self::Target {
61 &self.0
62 }
63}
64
65#[derive(Debug)]
76pub struct NetworkState<N: NetworkPrimitives = EthNetworkPrimitives> {
77 active_peers: HashMap<PeerId, ActivePeer<N>>,
79 peers_manager: PeersManager,
81 queued_messages: VecDeque<StateAction<N>>,
83 client: BlockNumReader,
88 discovery: Discovery,
90 state_fetcher: StateFetcher<N>,
96}
97
98impl<N: NetworkPrimitives> NetworkState<N> {
99 pub(crate) fn new(
101 client: BlockNumReader,
102 discovery: Discovery,
103 peers_manager: PeersManager,
104 num_active_peers: Arc<AtomicUsize>,
105 ) -> Self {
106 let state_fetcher = StateFetcher::new(peers_manager.handle(), num_active_peers);
107 Self {
108 active_peers: Default::default(),
109 peers_manager,
110 queued_messages: Default::default(),
111 client,
112 discovery,
113 state_fetcher,
114 }
115 }
116
117 pub(crate) const fn peers_mut(&mut self) -> &mut PeersManager {
119 &mut self.peers_manager
120 }
121
122 pub(crate) const fn discovery_mut(&mut self) -> &mut Discovery {
124 &mut self.discovery
125 }
126
127 pub(crate) const fn peers(&self) -> &PeersManager {
129 &self.peers_manager
130 }
131
132 pub(crate) fn fetch_client(&self) -> FetchClient<N> {
134 self.state_fetcher.client()
135 }
136
137 pub fn num_active_peers(&self) -> usize {
139 self.active_peers.len()
140 }
141
142 pub(crate) fn on_session_activated(
147 &mut self,
148 peer: PeerId,
149 capabilities: Arc<Capabilities>,
150 status: Arc<UnifiedStatus>,
151 request_tx: PeerRequestSender<PeerRequest<N>>,
152 timeout: Arc<AtomicU64>,
153 range_info: Option<BlockRangeInfo>,
154 ) {
155 debug_assert!(!self.active_peers.contains_key(&peer), "Already connected; not possible");
156
157 let block_number =
159 self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default();
160 self.state_fetcher.new_active_peer(
161 peer,
162 status.blockhash,
163 block_number,
164 Arc::clone(&capabilities),
165 timeout,
166 range_info,
167 );
168
169 self.active_peers.insert(
170 peer,
171 ActivePeer {
172 best_hash: status.blockhash,
173 capabilities,
174 request_tx,
175 pending_response: None,
176 blocks: LruCache::new(PEER_BLOCK_CACHE_LIMIT),
177 },
178 );
179 }
180
181 pub(crate) fn on_session_closed(&mut self, peer: PeerId) {
185 self.active_peers.remove(&peer);
186 self.state_fetcher.on_session_closed(&peer);
187 }
188
189 pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
198 let num_propagate = (self.active_peers.len() as f64).sqrt() as u64 + 1;
201
202 let number = msg.block.block().header().number();
203 let mut count = 0;
204
205 let mut peers: Vec<_> = self.active_peers.iter_mut().collect();
207 peers.shuffle(&mut rand::rng());
208
209 for (peer_id, peer) in peers {
210 if peer.blocks.contains(&msg.hash) {
211 continue
213 }
214
215 if count < num_propagate {
217 self.queued_messages
218 .push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() });
219
220 if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
222 peer.best_hash = msg.hash;
223 }
224
225 peer.blocks.insert(msg.hash);
227
228 count += 1;
229 }
230
231 if count >= num_propagate {
232 break
233 }
234 }
235 }
236
237 pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
240 let number = msg.block.block().header().number();
241 let hashes = NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]);
242 for (peer_id, peer) in &mut self.active_peers {
243 if peer.blocks.contains(&msg.hash) {
244 continue
246 }
247
248 if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
249 peer.best_hash = msg.hash;
250 }
251
252 self.queued_messages.push_back(StateAction::NewBlockHashes {
253 peer_id: *peer_id,
254 hashes: hashes.clone(),
255 });
256 }
257 }
258
259 pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) {
261 if let Some(peer) = self.active_peers.get_mut(peer_id) {
262 peer.best_hash = hash;
263 }
264 self.state_fetcher.update_peer_block(peer_id, hash, number);
265 }
266
267 pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
269 self.discovery.update_fork_id(fork_id)
270 }
271
272 pub(crate) fn on_new_block(&mut self, peer_id: PeerId, hash: B256) {
276 if let Some(peer) = self.active_peers.get_mut(&peer_id) {
278 peer.blocks.insert(hash);
279 }
280 }
281
282 pub(crate) fn on_new_block_hashes(&mut self, peer_id: PeerId, hashes: Vec<BlockHashNumber>) {
284 if let Some(peer) = self.active_peers.get_mut(&peer_id) {
286 peer.blocks.extend(hashes.into_iter().map(|b| b.hash));
287 }
288 }
289
290 pub(crate) fn ban_ip_discovery(&self, ip: IpAddr) {
292 trace!(target: "net", ?ip, "Banning discovery");
293 self.discovery.ban_ip(ip)
294 }
295
296 pub(crate) fn ban_discovery(&self, peer_id: PeerId, ip: IpAddr) {
298 trace!(target: "net", ?peer_id, ?ip, "Banning discovery");
299 self.discovery.ban(peer_id, ip)
300 }
301
302 pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
304 self.peers_manager.add_trusted_peer_id(peer_id)
305 }
306
307 pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
309 self.peers_manager.add_peer_kind(peer_id, kind, addr, None)
310 }
311
312 pub(crate) fn add_and_connect(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
314 self.peers_manager.add_and_connect_kind(peer_id, kind, addr, None)
315 }
316
317 pub(crate) fn remove_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind) {
319 match kind {
320 PeerKind::Basic | PeerKind::Static => self.peers_manager.remove_peer(peer_id),
321 PeerKind::Trusted => self.peers_manager.remove_peer_from_trusted_set(peer_id),
322 }
323 }
324
325 fn on_discovery_event(&mut self, event: DiscoveryEvent) {
327 match event {
328 DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id, addr, fork_id }) => {
329 self.queued_messages.push_back(StateAction::DiscoveredNode {
330 peer_id,
331 addr,
332 fork_id,
333 });
334 }
335 DiscoveryEvent::EnrForkId(peer_id, fork_id) => {
336 self.queued_messages
337 .push_back(StateAction::DiscoveredEnrForkId { peer_id, fork_id });
338 }
339 }
340 }
341
342 fn on_peer_action(&mut self, action: PeerAction) {
344 match action {
345 PeerAction::Connect { peer_id, remote_addr } => {
346 self.queued_messages.push_back(StateAction::Connect { peer_id, remote_addr });
347 }
348 PeerAction::Disconnect { peer_id, reason } => {
349 self.state_fetcher.on_pending_disconnect(&peer_id);
350 self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason });
351 }
352 PeerAction::DisconnectBannedIncoming { peer_id } |
353 PeerAction::DisconnectUntrustedIncoming { peer_id } => {
354 self.state_fetcher.on_pending_disconnect(&peer_id);
355 self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason: None });
356 }
357 PeerAction::DiscoveryBanPeerId { peer_id, ip_addr } => {
358 self.ban_discovery(peer_id, ip_addr)
359 }
360 PeerAction::DiscoveryBanIp { ip_addr } => self.ban_ip_discovery(ip_addr),
361 PeerAction::PeerAdded(peer_id) => {
362 self.queued_messages.push_back(StateAction::PeerAdded(peer_id))
363 }
364 PeerAction::PeerRemoved(peer_id) => {
365 self.queued_messages.push_back(StateAction::PeerRemoved(peer_id))
366 }
367 PeerAction::BanPeer { .. } | PeerAction::UnBanPeer { .. } => {}
368 }
369 }
370
371 fn handle_block_request(&mut self, peer: PeerId, request: BlockRequest) {
376 if let Some(ref mut peer) = self.active_peers.get_mut(&peer) {
377 let (request, response) = match request {
378 BlockRequest::GetBlockHeaders(request) => {
379 let (response, rx) = oneshot::channel();
380 let request = PeerRequest::GetBlockHeaders { request, response };
381 let response = PeerResponse::BlockHeaders { response: rx };
382 (request, response)
383 }
384 BlockRequest::GetBlockBodies(request) => {
385 let (response, rx) = oneshot::channel();
386 let request = PeerRequest::GetBlockBodies { request, response };
387 let response = PeerResponse::BlockBodies { response: rx };
388 (request, response)
389 }
390 };
391 let _ = peer.request_tx.to_session_tx.try_send(request);
392 peer.pending_response = Some(response);
393 }
394 }
395
396 fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) {
398 match outcome {
399 BlockResponseOutcome::Request(peer, request) => {
400 self.handle_block_request(peer, request);
401 }
402 BlockResponseOutcome::BadResponse(peer, reputation_change) => {
403 self.peers_manager.apply_reputation_change(&peer, reputation_change);
404 }
405 }
406 }
407
408 fn on_eth_response(&mut self, peer: PeerId, resp: PeerResponseResult<N>) {
414 let outcome = match resp {
415 PeerResponseResult::BlockHeaders(res) => {
416 self.state_fetcher.on_block_headers_response(peer, res)
417 }
418 PeerResponseResult::BlockBodies(res) => {
419 self.state_fetcher.on_block_bodies_response(peer, res)
420 }
421 _ => None,
422 };
423
424 if let Some(outcome) = outcome {
425 self.on_block_response_outcome(outcome);
426 }
427 }
428
429 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<StateAction<N>> {
431 loop {
432 if let Some(message) = self.queued_messages.pop_front() {
434 return Poll::Ready(message)
435 }
436
437 while let Poll::Ready(discovery) = self.discovery.poll(cx) {
438 self.on_discovery_event(discovery);
439 }
440
441 while let Poll::Ready(action) = self.state_fetcher.poll(cx) {
442 match action {
443 FetchAction::BlockRequest { peer_id, request } => {
444 self.handle_block_request(peer_id, request)
445 }
446 }
447 }
448
449 loop {
450 let mut closed_sessions = Vec::new();
452 let mut received_responses = Vec::new();
453
454 for (id, peer) in &mut self.active_peers {
456 let Some(mut response) = peer.pending_response.take() else { continue };
457 match response.poll(cx) {
458 Poll::Ready(res) => {
459 if res.err().is_some_and(|err| err.is_channel_closed()) {
461 debug!(
462 target: "net",
463 ?id,
464 "Request canceled, response channel from session closed."
465 );
466 closed_sessions.push(*id);
472 } else {
473 received_responses.push((*id, res));
474 }
475 }
476 Poll::Pending => {
477 peer.pending_response = Some(response);
479 }
480 };
481 }
482
483 for peer in closed_sessions {
484 self.on_session_closed(peer)
485 }
486
487 if received_responses.is_empty() {
488 break;
489 }
490
491 for (peer_id, resp) in received_responses {
492 self.on_eth_response(peer_id, resp);
493 }
494 }
495
496 while let Poll::Ready(action) = self.peers_manager.poll(cx) {
498 self.on_peer_action(action);
499 }
500
501 if self.queued_messages.is_empty() {
504 return Poll::Pending
505 }
506 }
507 }
508}
509
510#[derive(Debug)]
514pub(crate) struct ActivePeer<N: NetworkPrimitives> {
515 pub(crate) best_hash: B256,
517 #[expect(dead_code)]
519 pub(crate) capabilities: Arc<Capabilities>,
520 pub(crate) request_tx: PeerRequestSender<PeerRequest<N>>,
522 pub(crate) pending_response: Option<PeerResponse<N>>,
524 pub(crate) blocks: LruCache<B256>,
526}
527
528#[derive(Debug)]
530pub(crate) enum StateAction<N: NetworkPrimitives> {
531 NewBlock {
533 peer_id: PeerId,
535 block: NewBlockMessage<N::NewBlockPayload>,
537 },
538 NewBlockHashes {
539 peer_id: PeerId,
541 hashes: NewBlockHashes,
543 },
544 Connect { remote_addr: SocketAddr, peer_id: PeerId },
546 Disconnect {
548 peer_id: PeerId,
549 reason: Option<DisconnectReason>,
551 },
552 DiscoveredEnrForkId {
554 peer_id: PeerId,
555 fork_id: ForkId,
557 },
558 DiscoveredNode { peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId> },
560 PeerAdded(PeerId),
562 PeerRemoved(PeerId),
564}
565
566#[cfg(test)]
567mod tests {
568 use crate::{
569 discovery::Discovery,
570 fetch::StateFetcher,
571 peers::PeersManager,
572 state::{BlockNumReader, NetworkState},
573 PeerRequest,
574 };
575 use alloy_consensus::Header;
576 use alloy_primitives::B256;
577 use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthNetworkPrimitives, EthVersion};
578 use reth_ethereum_primitives::BlockBody;
579 use reth_network_api::PeerRequestSender;
580 use reth_network_p2p::{bodies::client::BodiesClient, error::RequestError};
581 use reth_network_peers::PeerId;
582 use reth_storage_api::noop::NoopProvider;
583 use std::{
584 future::poll_fn,
585 sync::{atomic::AtomicU64, Arc},
586 };
587 use tokio::sync::mpsc;
588 use tokio_stream::{wrappers::ReceiverStream, StreamExt};
589
590 fn state() -> NetworkState<EthNetworkPrimitives> {
592 let peers = PeersManager::default();
593 let handle = peers.handle();
594 NetworkState {
595 active_peers: Default::default(),
596 peers_manager: Default::default(),
597 queued_messages: Default::default(),
598 client: BlockNumReader(Box::new(NoopProvider::default())),
599 discovery: Discovery::noop(),
600 state_fetcher: StateFetcher::new(handle, Default::default()),
601 }
602 }
603
604 fn capabilities() -> Arc<Capabilities> {
605 Arc::new(vec![Capability::from(EthVersion::Eth67)].into())
606 }
607
608 #[tokio::test(flavor = "multi_thread")]
611 async fn test_dropped_active_session() {
612 let mut state = state();
613 let client = state.fetch_client();
614
615 let peer_id = PeerId::random();
616 let (tx, session_rx) = mpsc::channel(1);
617 let peer_tx = PeerRequestSender::new(peer_id, tx);
618
619 state.on_session_activated(
620 peer_id,
621 capabilities(),
622 Arc::default(),
623 peer_tx,
624 Arc::new(AtomicU64::new(1)),
625 None,
626 );
627
628 assert!(state.active_peers.contains_key(&peer_id));
629
630 let body = BlockBody { ommers: vec![Header::default()], ..Default::default() };
631
632 let body_response = body.clone();
633
634 tokio::task::spawn(async move {
636 let mut stream = ReceiverStream::new(session_rx);
637 let resp = stream.next().await.unwrap();
638 match resp {
639 PeerRequest::GetBlockBodies { response, .. } => {
640 response.send(Ok(BlockBodies(vec![body_response]))).unwrap();
641 }
642 _ => unreachable!(),
643 }
644
645 let _resp = stream.next().await.unwrap();
647 });
648
649 tokio::task::spawn(async move {
651 loop {
652 poll_fn(|cx| state.poll(cx)).await;
653 }
654 });
655
656 let (peer, bodies) = client.get_block_bodies(vec![B256::random()]).await.unwrap().split();
658 assert_eq!(peer, peer_id);
659 assert_eq!(bodies, vec![body]);
660
661 let resp = client.get_block_bodies(vec![B256::random()]).await;
662 assert!(resp.is_err());
663 assert_eq!(resp.unwrap_err(), RequestError::ConnectionDropped);
664 }
665}