1use crate::{
4 cache::LruCache,
5 discovery::Discovery,
6 fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
7 message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult},
8 peers::{PeerAction, PeersManager},
9 session::BlockRangeInfo,
10 FetchClient,
11};
12use alloy_consensus::BlockHeader;
13use alloy_primitives::B256;
14use rand::seq::SliceRandom;
15use reth_eth_wire::{
16 BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, GetReceipts70,
17 NetworkPrimitives, NewBlockHashes, NewBlockPayload, UnifiedStatus,
18};
19use reth_ethereum_forks::ForkId;
20use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender};
21use reth_network_p2p::receipts::client::ReceiptsResponse;
22use reth_network_peers::PeerId;
23use reth_network_types::{PeerAddr, PeerKind};
24use reth_primitives_traits::Block;
25use std::{
26 collections::{HashMap, VecDeque},
27 fmt,
28 net::{IpAddr, SocketAddr},
29 ops::Deref,
30 sync::{
31 atomic::{AtomicU64, AtomicUsize},
32 Arc,
33 },
34 task::{Context, Poll},
35};
36use tokio::sync::oneshot;
37use tracing::{debug, trace};
38
39const PEER_BLOCK_CACHE_LIMIT: u32 = 512;
41
42pub(crate) struct BlockNumReader(Box<dyn reth_storage_api::BlockNumReader>);
44
45impl BlockNumReader {
46 pub fn new(reader: impl reth_storage_api::BlockNumReader + 'static) -> Self {
48 Self(Box::new(reader))
49 }
50}
51
52impl fmt::Debug for BlockNumReader {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct("BlockNumReader").field("inner", &"<dyn BlockNumReader>").finish()
55 }
56}
57
58impl Deref for BlockNumReader {
59 type Target = Box<dyn reth_storage_api::BlockNumReader>;
60
61 fn deref(&self) -> &Self::Target {
62 &self.0
63 }
64}
65
66#[derive(Debug)]
77pub struct NetworkState<N: NetworkPrimitives = EthNetworkPrimitives> {
78 active_peers: HashMap<PeerId, ActivePeer<N>>,
80 peers_manager: PeersManager,
82 queued_messages: VecDeque<StateAction<N>>,
84 client: BlockNumReader,
89 discovery: Discovery,
91 state_fetcher: StateFetcher<N>,
97}
98
99impl<N: NetworkPrimitives> NetworkState<N> {
100 pub(crate) fn new(
102 client: BlockNumReader,
103 discovery: Discovery,
104 peers_manager: PeersManager,
105 num_active_peers: Arc<AtomicUsize>,
106 ) -> Self {
107 let state_fetcher = StateFetcher::new(peers_manager.handle(), num_active_peers);
108 Self {
109 active_peers: Default::default(),
110 peers_manager,
111 queued_messages: Default::default(),
112 client,
113 discovery,
114 state_fetcher,
115 }
116 }
117
118 pub(crate) const fn peers_mut(&mut self) -> &mut PeersManager {
120 &mut self.peers_manager
121 }
122
123 pub(crate) const fn discovery_mut(&mut self) -> &mut Discovery {
125 &mut self.discovery
126 }
127
128 pub(crate) const fn peers(&self) -> &PeersManager {
130 &self.peers_manager
131 }
132
133 pub(crate) fn fetch_client(&self) -> FetchClient<N> {
135 self.state_fetcher.client()
136 }
137
138 pub fn num_active_peers(&self) -> usize {
140 self.active_peers.len()
141 }
142
143 pub(crate) fn on_session_activated(
148 &mut self,
149 peer: PeerId,
150 capabilities: Arc<Capabilities>,
151 status: Arc<UnifiedStatus>,
152 request_tx: PeerRequestSender<PeerRequest<N>>,
153 timeout: Arc<AtomicU64>,
154 range_info: Option<BlockRangeInfo>,
155 ) {
156 debug_assert!(!self.active_peers.contains_key(&peer), "Already connected; not possible");
157
158 let block_number = status.latest_block.unwrap_or_else(|| {
161 self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default()
162 });
163 self.state_fetcher.new_active_peer(
164 peer,
165 status.blockhash,
166 block_number,
167 Arc::clone(&capabilities),
168 timeout,
169 range_info,
170 );
171
172 self.active_peers.insert(
173 peer,
174 ActivePeer {
175 best_hash: status.blockhash,
176 capabilities,
177 request_tx,
178 pending_response: None,
179 blocks: LruCache::new(PEER_BLOCK_CACHE_LIMIT),
180 },
181 );
182 }
183
184 pub(crate) fn on_session_closed(&mut self, peer: PeerId) {
188 self.active_peers.remove(&peer);
189 self.state_fetcher.on_session_closed(&peer);
190 }
191
192 pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
201 let num_propagate = (self.active_peers.len() as f64).sqrt() as u64 + 1;
204
205 let number = msg.block.block().header().number();
206 let mut count = 0;
207
208 let mut peers: Vec<_> = self.active_peers.iter_mut().collect();
210 peers.shuffle(&mut rand::rng());
211
212 for (peer_id, peer) in peers {
213 if peer.blocks.contains(&msg.hash) {
214 continue
216 }
217
218 if count < num_propagate {
220 self.queued_messages
221 .push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() });
222
223 if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
225 peer.best_hash = msg.hash;
226 }
227
228 peer.blocks.insert(msg.hash);
230
231 count += 1;
232 }
233
234 if count >= num_propagate {
235 break
236 }
237 }
238 }
239
240 pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
243 let number = msg.block.block().header().number();
244 let hashes = NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]);
245 for (peer_id, peer) in &mut self.active_peers {
246 if peer.blocks.contains(&msg.hash) {
247 continue
249 }
250
251 if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
252 peer.best_hash = msg.hash;
253 }
254
255 self.queued_messages.push_back(StateAction::NewBlockHashes {
256 peer_id: *peer_id,
257 hashes: hashes.clone(),
258 });
259 }
260 }
261
262 pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) {
264 if let Some(peer) = self.active_peers.get_mut(peer_id) {
265 peer.best_hash = hash;
266 }
267 self.state_fetcher.update_peer_block(peer_id, hash, number);
268 }
269
270 pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
272 self.discovery.update_fork_id(fork_id)
273 }
274
275 pub(crate) fn on_new_block(&mut self, peer_id: PeerId, hash: B256) {
279 if let Some(peer) = self.active_peers.get_mut(&peer_id) {
281 peer.blocks.insert(hash);
282 }
283 }
284
285 pub(crate) fn on_new_block_hashes(&mut self, peer_id: PeerId, hashes: Vec<BlockHashNumber>) {
287 if let Some(peer) = self.active_peers.get_mut(&peer_id) {
289 peer.blocks.extend(hashes.into_iter().map(|b| b.hash));
290 }
291 }
292
293 pub(crate) fn ban_ip_discovery(&self, ip: IpAddr) {
295 trace!(target: "net", ?ip, "Banning discovery");
296 self.discovery.ban_ip(ip)
297 }
298
299 pub(crate) fn ban_discovery(&self, peer_id: PeerId, ip: IpAddr) {
301 trace!(target: "net", ?peer_id, ?ip, "Banning discovery");
302 self.discovery.ban(peer_id, ip)
303 }
304
305 pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
307 self.peers_manager.add_trusted_peer_id(peer_id)
308 }
309
310 pub(crate) fn add_peer_kind(
312 &mut self,
313 peer_id: PeerId,
314 kind: Option<PeerKind>,
315 addr: PeerAddr,
316 ) {
317 self.peers_manager.add_peer_kind(peer_id, kind, addr, None)
318 }
319
320 pub(crate) fn add_and_connect(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
322 self.peers_manager.add_and_connect_kind(peer_id, kind, addr, None)
323 }
324
325 pub(crate) fn remove_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind) {
327 match kind {
328 PeerKind::Basic | PeerKind::Static => self.peers_manager.remove_peer(peer_id),
329 PeerKind::Trusted => self.peers_manager.remove_peer_from_trusted_set(peer_id),
330 }
331 }
332
333 fn on_discovery_event(&mut self, event: DiscoveryEvent) {
335 match event {
336 DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id, addr, fork_id }) => {
337 self.queued_messages.push_back(StateAction::DiscoveredNode {
338 peer_id,
339 addr,
340 fork_id,
341 });
342 }
343 DiscoveryEvent::EnrForkId(record, fork_id) => {
344 let peer_id = record.id;
345 let tcp_addr = record.tcp_addr();
346 if tcp_addr.port() == 0 {
347 return
348 }
349 let udp_addr = record.udp_addr();
350 let addr = PeerAddr::new(tcp_addr, Some(udp_addr));
351 self.queued_messages.push_back(StateAction::DiscoveredEnrForkId {
352 peer_id,
353 addr,
354 fork_id,
355 });
356 }
357 }
358 }
359
360 fn on_peer_action(&mut self, action: PeerAction) {
362 match action {
363 PeerAction::Connect { peer_id, remote_addr } => {
364 self.queued_messages.push_back(StateAction::Connect { peer_id, remote_addr });
365 }
366 PeerAction::Disconnect { peer_id, reason } => {
367 self.state_fetcher.on_pending_disconnect(&peer_id);
368 self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason });
369 }
370 PeerAction::DisconnectBannedIncoming { peer_id } |
371 PeerAction::DisconnectUntrustedIncoming { peer_id } => {
372 self.state_fetcher.on_pending_disconnect(&peer_id);
373 self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason: None });
374 }
375 PeerAction::DiscoveryBanPeerId { peer_id, ip_addr } => {
376 self.ban_discovery(peer_id, ip_addr)
377 }
378 PeerAction::DiscoveryBanIp { ip_addr } => self.ban_ip_discovery(ip_addr),
379 PeerAction::PeerAdded(peer_id) => {
380 self.queued_messages.push_back(StateAction::PeerAdded(peer_id))
381 }
382 PeerAction::PeerRemoved(peer_id) => {
383 self.queued_messages.push_back(StateAction::PeerRemoved(peer_id))
384 }
385 PeerAction::BanPeer { .. } | PeerAction::UnBanPeer { .. } => {}
386 }
387 }
388
389 fn handle_block_request(&mut self, peer_id: PeerId, request: BlockRequest) {
394 if let Some(ref mut peer) = self.active_peers.get_mut(&peer_id) {
395 let (request, response) = match request {
396 BlockRequest::GetBlockHeaders(request) => {
397 let (response, rx) = oneshot::channel();
398 let request = PeerRequest::GetBlockHeaders { request, response };
399 let response = PeerResponse::BlockHeaders { response: rx };
400 (request, response)
401 }
402 BlockRequest::GetBlockBodies(request) => {
403 let (response, rx) = oneshot::channel();
404 let request = PeerRequest::GetBlockBodies { request, response };
405 let response = PeerResponse::BlockBodies { response: rx };
406 (request, response)
407 }
408 BlockRequest::GetBlockAccessLists(request) => {
409 let (response, rx) = oneshot::channel();
410 let request = PeerRequest::GetBlockAccessLists { request, response };
411 let response = PeerResponse::BlockAccessLists { response: rx };
412 (request, response)
413 }
414 BlockRequest::GetReceipts(request) => {
415 if peer.capabilities.supports_eth_v70() {
416 let (response, rx) = oneshot::channel();
417 let request = PeerRequest::GetReceipts70 {
418 request: GetReceipts70 {
419 first_block_receipt_index: 0,
420 block_hashes: request.0,
421 },
422 response,
423 };
424 let response = PeerResponse::Receipts70 { response: rx };
425 (request, response)
426 } else if peer.capabilities.supports_eth_v69() {
427 let (response, rx) = oneshot::channel();
428 let request = PeerRequest::GetReceipts69 { request, response };
429 let response = PeerResponse::Receipts69 { response: rx };
430 (request, response)
431 } else {
432 let (response, rx) = oneshot::channel();
433 let request = PeerRequest::GetReceipts { request, response };
434 let response = PeerResponse::Receipts { response: rx };
435 (request, response)
436 }
437 }
438 };
439 let _ = peer.request_tx.to_session_tx.try_send(request);
440 peer.pending_response = Some(response);
441 }
442 }
443
444 fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) {
446 match outcome {
447 BlockResponseOutcome::Request(peer, request) => {
448 self.handle_block_request(peer, request);
449 }
450 BlockResponseOutcome::BadResponse(peer, reputation_change) => {
451 self.peers_manager.apply_reputation_change(&peer, reputation_change);
452 }
453 }
454 }
455
456 fn on_eth_response(&mut self, peer: PeerId, resp: PeerResponseResult<N>) {
462 let outcome = match resp {
463 PeerResponseResult::BlockHeaders(res) => {
464 self.state_fetcher.on_block_headers_response(peer, res)
465 }
466 PeerResponseResult::BlockBodies(res) => {
467 self.state_fetcher.on_block_bodies_response(peer, res)
468 }
469 PeerResponseResult::Receipts(res) => {
470 let normalized = res.map(|blocks| {
472 let receipts = blocks
473 .into_iter()
474 .map(|block_receipts| {
475 block_receipts.into_iter().map(|rwb| rwb.receipt).collect()
476 })
477 .collect();
478 ReceiptsResponse::new(receipts)
479 });
480 self.state_fetcher.on_receipts_response(peer, normalized)
481 }
482 PeerResponseResult::Receipts69(res) => {
483 let normalized = res.map(ReceiptsResponse::new);
484 self.state_fetcher.on_receipts_response(peer, normalized)
485 }
486 PeerResponseResult::Receipts70(res) => {
487 let normalized = res.map(ReceiptsResponse::from);
488 self.state_fetcher.on_receipts_response(peer, normalized)
489 }
490 PeerResponseResult::BlockAccessLists(res) => {
491 self.state_fetcher.on_block_access_lists_response(peer, res)
492 }
493 _ => None,
494 };
495
496 if let Some(outcome) = outcome {
497 self.on_block_response_outcome(outcome);
498 }
499 }
500
501 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<StateAction<N>> {
503 loop {
504 if let Some(message) = self.queued_messages.pop_front() {
506 return Poll::Ready(message)
507 }
508
509 while let Poll::Ready(discovery) = self.discovery.poll(cx) {
510 self.on_discovery_event(discovery);
511 }
512
513 while let Poll::Ready(action) = self.state_fetcher.poll(cx) {
514 match action {
515 FetchAction::BlockRequest { peer_id, request } => {
516 self.handle_block_request(peer_id, request)
517 }
518 }
519 }
520
521 loop {
522 let mut closed_sessions = Vec::new();
524 let mut received_responses = Vec::new();
525
526 for (id, peer) in &mut self.active_peers {
528 let Some(mut response) = peer.pending_response.take() else { continue };
529 match response.poll(cx) {
530 Poll::Ready(res) => {
531 if res.err().is_some_and(|err| err.is_channel_closed()) {
533 debug!(
534 target: "net",
535 ?id,
536 "Request canceled, response channel from session closed."
537 );
538 closed_sessions.push(*id);
544 } else {
545 received_responses.push((*id, res));
546 }
547 }
548 Poll::Pending => {
549 peer.pending_response = Some(response);
551 }
552 };
553 }
554
555 for peer in closed_sessions {
556 self.on_session_closed(peer)
557 }
558
559 if received_responses.is_empty() {
560 break;
561 }
562
563 for (peer_id, resp) in received_responses {
564 self.on_eth_response(peer_id, resp);
565 }
566 }
567
568 while let Poll::Ready(action) = self.peers_manager.poll(cx) {
570 self.on_peer_action(action);
571 }
572
573 if self.queued_messages.is_empty() {
576 return Poll::Pending
577 }
578 }
579 }
580}
581
582#[derive(Debug)]
586pub(crate) struct ActivePeer<N: NetworkPrimitives> {
587 pub(crate) best_hash: B256,
589 pub(crate) capabilities: Arc<Capabilities>,
591 pub(crate) request_tx: PeerRequestSender<PeerRequest<N>>,
593 pub(crate) pending_response: Option<PeerResponse<N>>,
595 pub(crate) blocks: LruCache<B256>,
597}
598
599#[derive(Debug)]
601pub(crate) enum StateAction<N: NetworkPrimitives> {
602 NewBlock {
604 peer_id: PeerId,
606 block: NewBlockMessage<N::NewBlockPayload>,
608 },
609 NewBlockHashes {
610 peer_id: PeerId,
612 hashes: NewBlockHashes,
614 },
615 Connect { remote_addr: SocketAddr, peer_id: PeerId },
617 Disconnect {
619 peer_id: PeerId,
620 reason: Option<DisconnectReason>,
622 },
623 DiscoveredEnrForkId {
625 peer_id: PeerId,
626 addr: PeerAddr,
628 fork_id: ForkId,
630 },
631 DiscoveredNode { peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId> },
633 PeerAdded(PeerId),
635 PeerRemoved(PeerId),
637}
638
639#[cfg(test)]
640mod tests {
641 use crate::{
642 discovery::Discovery,
643 fetch::StateFetcher,
644 peers::PeersManager,
645 state::{BlockNumReader, NetworkState},
646 PeerRequest,
647 };
648 use alloy_consensus::Header;
649 use alloy_primitives::B256;
650 use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthNetworkPrimitives, EthVersion};
651 use reth_ethereum_primitives::BlockBody;
652 use reth_network_api::PeerRequestSender;
653 use reth_network_p2p::{bodies::client::BodiesClient, error::RequestError};
654 use reth_network_peers::PeerId;
655 use reth_storage_api::noop::NoopProvider;
656 use std::{
657 future::poll_fn,
658 sync::{atomic::AtomicU64, Arc},
659 };
660 use tokio::sync::mpsc;
661 use tokio_stream::{wrappers::ReceiverStream, StreamExt};
662
663 fn state() -> NetworkState<EthNetworkPrimitives> {
665 let peers = PeersManager::default();
666 let handle = peers.handle();
667 NetworkState {
668 active_peers: Default::default(),
669 peers_manager: Default::default(),
670 queued_messages: Default::default(),
671 client: BlockNumReader(Box::new(NoopProvider::default())),
672 discovery: Discovery::noop(),
673 state_fetcher: StateFetcher::new(handle, Default::default()),
674 }
675 }
676
677 fn capabilities() -> Arc<Capabilities> {
678 Arc::new(vec![Capability::from(EthVersion::Eth67)].into())
679 }
680
681 #[tokio::test(flavor = "multi_thread")]
684 async fn test_dropped_active_session() {
685 let mut state = state();
686 let client = state.fetch_client();
687
688 let peer_id = PeerId::random();
689 let (tx, session_rx) = mpsc::channel(1);
690 let peer_tx = PeerRequestSender::new(peer_id, tx);
691
692 state.on_session_activated(
693 peer_id,
694 capabilities(),
695 Arc::default(),
696 peer_tx,
697 Arc::new(AtomicU64::new(1)),
698 None,
699 );
700
701 assert!(state.active_peers.contains_key(&peer_id));
702
703 let body = BlockBody { ommers: vec![Header::default()], ..Default::default() };
704
705 let body_response = body.clone();
706
707 tokio::task::spawn(async move {
709 let mut stream = ReceiverStream::new(session_rx);
710 let resp = stream.next().await.unwrap();
711 match resp {
712 PeerRequest::GetBlockBodies { response, .. } => {
713 response.send(Ok(BlockBodies(vec![body_response]))).unwrap();
714 }
715 _ => unreachable!(),
716 }
717
718 let _resp = stream.next().await.unwrap();
720 });
721
722 tokio::task::spawn(async move {
724 loop {
725 poll_fn(|cx| state.poll(cx)).await;
726 }
727 });
728
729 let (peer, bodies) = client.get_block_bodies(vec![B256::random()]).await.unwrap().split();
731 assert_eq!(peer, peer_id);
732 assert_eq!(bodies, vec![body]);
733
734 let resp = client.get_block_bodies(vec![B256::random()]).await;
735 assert!(resp.is_err());
736 assert_eq!(resp.unwrap_err(), RequestError::ConnectionDropped);
737 }
738}