1use crate::{
4 cache::LruCache,
5 discovery::Discovery,
6 fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
7 message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult},
8 peers::{PeerAction, PeersManager},
9 FetchClient,
10};
11use alloy_consensus::BlockHeader;
12use alloy_primitives::B256;
13use rand::seq::SliceRandom;
14use reth_eth_wire::{
15 BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives,
16 NewBlockHashes, Status,
17};
18use reth_ethereum_forks::ForkId;
19use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender};
20use reth_network_peers::PeerId;
21use reth_network_types::{PeerAddr, PeerKind};
22use reth_primitives_traits::Block;
23use std::{
24 collections::{HashMap, VecDeque},
25 fmt,
26 net::{IpAddr, SocketAddr},
27 ops::Deref,
28 sync::{
29 atomic::{AtomicU64, AtomicUsize},
30 Arc,
31 },
32 task::{Context, Poll},
33};
34use tokio::sync::oneshot;
35use tracing::{debug, trace};
36
37const PEER_BLOCK_CACHE_LIMIT: u32 = 512;
39
40pub(crate) struct BlockNumReader(Box<dyn reth_storage_api::BlockNumReader>);
42
43impl BlockNumReader {
44 pub fn new(reader: impl reth_storage_api::BlockNumReader + 'static) -> Self {
46 Self(Box::new(reader))
47 }
48}
49
50impl fmt::Debug for BlockNumReader {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 f.debug_struct("BlockNumReader").field("inner", &"<dyn BlockNumReader>").finish()
53 }
54}
55
56impl Deref for BlockNumReader {
57 type Target = Box<dyn reth_storage_api::BlockNumReader>;
58
59 fn deref(&self) -> &Self::Target {
60 &self.0
61 }
62}
63
64#[derive(Debug)]
75pub struct NetworkState<N: NetworkPrimitives = EthNetworkPrimitives> {
76 active_peers: HashMap<PeerId, ActivePeer<N>>,
78 peers_manager: PeersManager,
80 queued_messages: VecDeque<StateAction<N>>,
82 client: BlockNumReader,
87 discovery: Discovery,
89 state_fetcher: StateFetcher<N>,
95}
96
97impl<N: NetworkPrimitives> NetworkState<N> {
98 pub(crate) fn new(
100 client: BlockNumReader,
101 discovery: Discovery,
102 peers_manager: PeersManager,
103 num_active_peers: Arc<AtomicUsize>,
104 ) -> Self {
105 let state_fetcher = StateFetcher::new(peers_manager.handle(), num_active_peers);
106 Self {
107 active_peers: Default::default(),
108 peers_manager,
109 queued_messages: Default::default(),
110 client,
111 discovery,
112 state_fetcher,
113 }
114 }
115
116 pub(crate) const fn peers_mut(&mut self) -> &mut PeersManager {
118 &mut self.peers_manager
119 }
120
121 pub(crate) const fn discovery_mut(&mut self) -> &mut Discovery {
123 &mut self.discovery
124 }
125
126 pub(crate) const fn peers(&self) -> &PeersManager {
128 &self.peers_manager
129 }
130
131 pub(crate) fn fetch_client(&self) -> FetchClient<N> {
133 self.state_fetcher.client()
134 }
135
136 pub fn num_active_peers(&self) -> usize {
138 self.active_peers.len()
139 }
140
141 pub(crate) fn on_session_activated(
146 &mut self,
147 peer: PeerId,
148 capabilities: Arc<Capabilities>,
149 status: Arc<Status>,
150 request_tx: PeerRequestSender<PeerRequest<N>>,
151 timeout: Arc<AtomicU64>,
152 ) {
153 debug_assert!(!self.active_peers.contains_key(&peer), "Already connected; not possible");
154
155 let block_number =
157 self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default();
158 self.state_fetcher.new_active_peer(peer, status.blockhash, block_number, timeout);
159
160 self.active_peers.insert(
161 peer,
162 ActivePeer {
163 best_hash: status.blockhash,
164 capabilities,
165 request_tx,
166 pending_response: None,
167 blocks: LruCache::new(PEER_BLOCK_CACHE_LIMIT),
168 },
169 );
170 }
171
172 pub(crate) fn on_session_closed(&mut self, peer: PeerId) {
176 self.active_peers.remove(&peer);
177 self.state_fetcher.on_session_closed(&peer);
178 }
179
180 pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage<N::Block>) {
189 let num_propagate = (self.active_peers.len() as f64).sqrt() as u64 + 1;
192
193 let number = msg.block.block.header().number();
194 let mut count = 0;
195
196 let mut peers: Vec<_> = self.active_peers.iter_mut().collect();
198 peers.shuffle(&mut rand::rng());
199
200 for (peer_id, peer) in peers {
201 if peer.blocks.contains(&msg.hash) {
202 continue
204 }
205
206 if count < num_propagate {
208 self.queued_messages
209 .push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() });
210
211 if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
213 peer.best_hash = msg.hash;
214 }
215
216 peer.blocks.insert(msg.hash);
218
219 count += 1;
220 }
221
222 if count >= num_propagate {
223 break
224 }
225 }
226 }
227
228 pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage<N::Block>) {
231 let number = msg.block.block.header().number();
232 let hashes = NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]);
233 for (peer_id, peer) in &mut self.active_peers {
234 if peer.blocks.contains(&msg.hash) {
235 continue
237 }
238
239 if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
240 peer.best_hash = msg.hash;
241 }
242
243 self.queued_messages.push_back(StateAction::NewBlockHashes {
244 peer_id: *peer_id,
245 hashes: hashes.clone(),
246 });
247 }
248 }
249
250 pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) {
252 if let Some(peer) = self.active_peers.get_mut(peer_id) {
253 peer.best_hash = hash;
254 }
255 self.state_fetcher.update_peer_block(peer_id, hash, number);
256 }
257
258 pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
260 self.discovery.update_fork_id(fork_id)
261 }
262
263 pub(crate) fn on_new_block(&mut self, peer_id: PeerId, hash: B256) {
267 if let Some(peer) = self.active_peers.get_mut(&peer_id) {
269 peer.blocks.insert(hash);
270 }
271 }
272
273 pub(crate) fn on_new_block_hashes(&mut self, peer_id: PeerId, hashes: Vec<BlockHashNumber>) {
275 if let Some(peer) = self.active_peers.get_mut(&peer_id) {
277 peer.blocks.extend(hashes.into_iter().map(|b| b.hash));
278 }
279 }
280
281 pub(crate) fn ban_ip_discovery(&self, ip: IpAddr) {
283 trace!(target: "net", ?ip, "Banning discovery");
284 self.discovery.ban_ip(ip)
285 }
286
287 pub(crate) fn ban_discovery(&self, peer_id: PeerId, ip: IpAddr) {
289 trace!(target: "net", ?peer_id, ?ip, "Banning discovery");
290 self.discovery.ban(peer_id, ip)
291 }
292
293 pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
295 self.peers_manager.add_trusted_peer_id(peer_id)
296 }
297
298 pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
300 self.peers_manager.add_peer_kind(peer_id, kind, addr, None)
301 }
302
303 pub(crate) fn add_and_connect(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
305 self.peers_manager.add_and_connect_kind(peer_id, kind, addr, None)
306 }
307
308 pub(crate) fn remove_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind) {
310 match kind {
311 PeerKind::Basic | PeerKind::Static => self.peers_manager.remove_peer(peer_id),
312 PeerKind::Trusted => self.peers_manager.remove_peer_from_trusted_set(peer_id),
313 }
314 }
315
316 fn on_discovery_event(&mut self, event: DiscoveryEvent) {
318 match event {
319 DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id, addr, fork_id }) => {
320 self.queued_messages.push_back(StateAction::DiscoveredNode {
321 peer_id,
322 addr,
323 fork_id,
324 });
325 }
326 DiscoveryEvent::EnrForkId(peer_id, fork_id) => {
327 self.queued_messages
328 .push_back(StateAction::DiscoveredEnrForkId { peer_id, fork_id });
329 }
330 }
331 }
332
333 fn on_peer_action(&mut self, action: PeerAction) {
335 match action {
336 PeerAction::Connect { peer_id, remote_addr } => {
337 self.queued_messages.push_back(StateAction::Connect { peer_id, remote_addr });
338 }
339 PeerAction::Disconnect { peer_id, reason } => {
340 self.state_fetcher.on_pending_disconnect(&peer_id);
341 self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason });
342 }
343 PeerAction::DisconnectBannedIncoming { peer_id } |
344 PeerAction::DisconnectUntrustedIncoming { peer_id } => {
345 self.state_fetcher.on_pending_disconnect(&peer_id);
346 self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason: None });
347 }
348 PeerAction::DiscoveryBanPeerId { peer_id, ip_addr } => {
349 self.ban_discovery(peer_id, ip_addr)
350 }
351 PeerAction::DiscoveryBanIp { ip_addr } => self.ban_ip_discovery(ip_addr),
352 PeerAction::PeerAdded(peer_id) => {
353 self.queued_messages.push_back(StateAction::PeerAdded(peer_id))
354 }
355 PeerAction::PeerRemoved(peer_id) => {
356 self.queued_messages.push_back(StateAction::PeerRemoved(peer_id))
357 }
358 PeerAction::BanPeer { .. } | PeerAction::UnBanPeer { .. } => {}
359 }
360 }
361
362 fn handle_block_request(&mut self, peer: PeerId, request: BlockRequest) {
367 if let Some(ref mut peer) = self.active_peers.get_mut(&peer) {
368 let (request, response) = match request {
369 BlockRequest::GetBlockHeaders(request) => {
370 let (response, rx) = oneshot::channel();
371 let request = PeerRequest::GetBlockHeaders { request, response };
372 let response = PeerResponse::BlockHeaders { response: rx };
373 (request, response)
374 }
375 BlockRequest::GetBlockBodies(request) => {
376 let (response, rx) = oneshot::channel();
377 let request = PeerRequest::GetBlockBodies { request, response };
378 let response = PeerResponse::BlockBodies { response: rx };
379 (request, response)
380 }
381 };
382 let _ = peer.request_tx.to_session_tx.try_send(request);
383 peer.pending_response = Some(response);
384 }
385 }
386
387 fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) {
389 match outcome {
390 BlockResponseOutcome::Request(peer, request) => {
391 self.handle_block_request(peer, request);
392 }
393 BlockResponseOutcome::BadResponse(peer, reputation_change) => {
394 self.peers_manager.apply_reputation_change(&peer, reputation_change);
395 }
396 }
397 }
398
399 fn on_eth_response(&mut self, peer: PeerId, resp: PeerResponseResult<N>) {
405 let outcome = match resp {
406 PeerResponseResult::BlockHeaders(res) => {
407 self.state_fetcher.on_block_headers_response(peer, res)
408 }
409 PeerResponseResult::BlockBodies(res) => {
410 self.state_fetcher.on_block_bodies_response(peer, res)
411 }
412 _ => None,
413 };
414
415 if let Some(outcome) = outcome {
416 self.on_block_response_outcome(outcome);
417 }
418 }
419
420 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<StateAction<N>> {
422 loop {
423 if let Some(message) = self.queued_messages.pop_front() {
425 return Poll::Ready(message)
426 }
427
428 while let Poll::Ready(discovery) = self.discovery.poll(cx) {
429 self.on_discovery_event(discovery);
430 }
431
432 while let Poll::Ready(action) = self.state_fetcher.poll(cx) {
433 match action {
434 FetchAction::BlockRequest { peer_id, request } => {
435 self.handle_block_request(peer_id, request)
436 }
437 }
438 }
439
440 loop {
441 let mut closed_sessions = Vec::new();
443 let mut received_responses = Vec::new();
444
445 for (id, peer) in &mut self.active_peers {
447 let Some(mut response) = peer.pending_response.take() else { continue };
448 match response.poll(cx) {
449 Poll::Ready(res) => {
450 if res.err().is_some_and(|err| err.is_channel_closed()) {
452 debug!(
453 target: "net",
454 ?id,
455 "Request canceled, response channel from session closed."
456 );
457 closed_sessions.push(*id);
463 } else {
464 received_responses.push((*id, res));
465 }
466 }
467 Poll::Pending => {
468 peer.pending_response = Some(response);
470 }
471 };
472 }
473
474 for peer in closed_sessions {
475 self.on_session_closed(peer)
476 }
477
478 if received_responses.is_empty() {
479 break;
480 }
481
482 for (peer_id, resp) in received_responses {
483 self.on_eth_response(peer_id, resp);
484 }
485 }
486
487 while let Poll::Ready(action) = self.peers_manager.poll(cx) {
489 self.on_peer_action(action);
490 }
491
492 if self.queued_messages.is_empty() {
495 return Poll::Pending
496 }
497 }
498 }
499}
500
501#[derive(Debug)]
505pub(crate) struct ActivePeer<N: NetworkPrimitives> {
506 pub(crate) best_hash: B256,
508 #[expect(dead_code)]
510 pub(crate) capabilities: Arc<Capabilities>,
511 pub(crate) request_tx: PeerRequestSender<PeerRequest<N>>,
513 pub(crate) pending_response: Option<PeerResponse<N>>,
515 pub(crate) blocks: LruCache<B256>,
517}
518
519#[derive(Debug)]
521pub(crate) enum StateAction<N: NetworkPrimitives> {
522 NewBlock {
524 peer_id: PeerId,
526 block: NewBlockMessage<N::Block>,
528 },
529 NewBlockHashes {
530 peer_id: PeerId,
532 hashes: NewBlockHashes,
534 },
535 Connect { remote_addr: SocketAddr, peer_id: PeerId },
537 Disconnect {
539 peer_id: PeerId,
540 reason: Option<DisconnectReason>,
542 },
543 DiscoveredEnrForkId {
545 peer_id: PeerId,
546 fork_id: ForkId,
548 },
549 DiscoveredNode { peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId> },
551 PeerAdded(PeerId),
553 PeerRemoved(PeerId),
555}
556
557#[cfg(test)]
558mod tests {
559 use crate::{
560 discovery::Discovery,
561 fetch::StateFetcher,
562 peers::PeersManager,
563 state::{BlockNumReader, NetworkState},
564 PeerRequest,
565 };
566 use alloy_consensus::Header;
567 use alloy_primitives::B256;
568 use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthNetworkPrimitives, EthVersion};
569 use reth_ethereum_primitives::BlockBody;
570 use reth_network_api::PeerRequestSender;
571 use reth_network_p2p::{bodies::client::BodiesClient, error::RequestError};
572 use reth_network_peers::PeerId;
573 use reth_storage_api::noop::NoopProvider;
574 use std::{
575 future::poll_fn,
576 sync::{atomic::AtomicU64, Arc},
577 };
578 use tokio::sync::mpsc;
579 use tokio_stream::{wrappers::ReceiverStream, StreamExt};
580
581 fn state() -> NetworkState<EthNetworkPrimitives> {
583 let peers = PeersManager::default();
584 let handle = peers.handle();
585 NetworkState {
586 active_peers: Default::default(),
587 peers_manager: Default::default(),
588 queued_messages: Default::default(),
589 client: BlockNumReader(Box::new(NoopProvider::default())),
590 discovery: Discovery::noop(),
591 state_fetcher: StateFetcher::new(handle, Default::default()),
592 }
593 }
594
595 fn capabilities() -> Arc<Capabilities> {
596 Arc::new(vec![Capability::from(EthVersion::Eth67)].into())
597 }
598
599 #[tokio::test(flavor = "multi_thread")]
602 async fn test_dropped_active_session() {
603 let mut state = state();
604 let client = state.fetch_client();
605
606 let peer_id = PeerId::random();
607 let (tx, session_rx) = mpsc::channel(1);
608 let peer_tx = PeerRequestSender::new(peer_id, tx);
609
610 state.on_session_activated(
611 peer_id,
612 capabilities(),
613 Arc::default(),
614 peer_tx,
615 Arc::new(AtomicU64::new(1)),
616 );
617
618 assert!(state.active_peers.contains_key(&peer_id));
619
620 let body = BlockBody { ommers: vec![Header::default()], ..Default::default() };
621
622 let body_response = body.clone();
623
624 tokio::task::spawn(async move {
626 let mut stream = ReceiverStream::new(session_rx);
627 let resp = stream.next().await.unwrap();
628 match resp {
629 PeerRequest::GetBlockBodies { response, .. } => {
630 response.send(Ok(BlockBodies(vec![body_response]))).unwrap();
631 }
632 _ => unreachable!(),
633 }
634
635 let _resp = stream.next().await.unwrap();
637 });
638
639 tokio::task::spawn(async move {
641 loop {
642 poll_fn(|cx| state.poll(cx)).await;
643 }
644 });
645
646 let (peer, bodies) = client.get_block_bodies(vec![B256::random()]).await.unwrap().split();
648 assert_eq!(peer, peer_id);
649 assert_eq!(bodies, vec![body]);
650
651 let resp = client.get_block_bodies(vec![B256::random()]).await;
652 assert!(resp.is_err());
653 assert_eq!(resp.unwrap_err(), RequestError::ConnectionDropped);
654 }
655}