1use crate::{
2 wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
3};
4use alloy_consensus::BlockHeader;
5use alloy_eips::BlockNumHash;
6use futures::StreamExt;
7use itertools::Itertools;
8use metrics::Gauge;
9use reth_chain_state::ForkChoiceStream;
10use reth_ethereum_primitives::EthPrimitives;
11use reth_evm::ConfigureEvm;
12use reth_metrics::{metrics::Counter, Metrics};
13use reth_node_api::NodePrimitives;
14use reth_primitives_traits::SealedHeader;
15use reth_provider::HeaderProvider;
16use reth_tracing::tracing::{debug, warn};
17use std::{
18 collections::VecDeque,
19 fmt::Debug,
20 future::{poll_fn, Future},
21 ops::Not,
22 pin::Pin,
23 sync::{
24 atomic::{AtomicUsize, Ordering},
25 Arc,
26 },
27 task::{ready, Context, Poll},
28};
29use tokio::sync::{
30 mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender},
31 watch,
32};
33use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
34
35pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024;
40
41pub const DEFAULT_WAL_BLOCKS_WARNING: usize = 128;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum ExExNotificationSource {
56 Pipeline,
58 BlockchainTree,
60}
61
62#[derive(Metrics)]
64#[metrics(scope = "exex")]
65struct ExExMetrics {
66 notifications_sent_total: Counter,
68 events_sent_total: Counter,
70}
71
72#[derive(Debug)]
78pub struct ExExHandle<N: NodePrimitives = EthPrimitives> {
79 id: String,
81 metrics: ExExMetrics,
83 sender: PollSender<ExExNotification<N>>,
85 receiver: UnboundedReceiver<ExExEvent>,
87 next_notification_id: usize,
89 finished_height: Option<BlockNumHash>,
93}
94
95impl<N: NodePrimitives> ExExHandle<N> {
96 pub fn new<P, E: ConfigureEvm<Primitives = N>>(
101 id: String,
102 node_head: BlockNumHash,
103 provider: P,
104 evm_config: E,
105 wal_handle: WalHandle<N>,
106 ) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<P, E>) {
107 let (notification_tx, notification_rx) = mpsc::channel(1);
108 let (event_tx, event_rx) = mpsc::unbounded_channel();
109 let notifications =
110 ExExNotifications::new(node_head, provider, evm_config, notification_rx, wal_handle);
111
112 (
113 Self {
114 id: id.clone(),
115 metrics: ExExMetrics::new_with_labels(&[("exex", id)]),
116 sender: PollSender::new(notification_tx),
117 receiver: event_rx,
118 next_notification_id: 0,
119 finished_height: None,
120 },
121 event_tx,
122 notifications,
123 )
124 }
125
126 fn send(
131 &mut self,
132 cx: &mut Context<'_>,
133 (notification_id, notification): &(usize, ExExNotification<N>),
134 ) -> Poll<Result<(), PollSendError<ExExNotification<N>>>> {
135 if let Some(finished_height) = self.finished_height {
136 match notification {
137 ExExNotification::ChainCommitted { new } => {
138 if finished_height.number >= new.tip().number() {
142 debug!(
143 target: "exex::manager",
144 exex_id = %self.id,
145 %notification_id,
146 ?finished_height,
147 new_tip = %new.tip().number(),
148 "Skipping notification"
149 );
150
151 self.next_notification_id = notification_id + 1;
152 return Poll::Ready(Ok(()))
153 }
154 }
155 ExExNotification::ChainReorged { .. } | ExExNotification::ChainReverted { .. } => {}
160 }
161 }
162
163 debug!(
164 target: "exex::manager",
165 exex_id = %self.id,
166 %notification_id,
167 "Reserving slot for notification"
168 );
169 match self.sender.poll_reserve(cx) {
170 Poll::Ready(Ok(())) => (),
171 other => return other,
172 }
173
174 debug!(
175 target: "exex::manager",
176 exex_id = %self.id,
177 %notification_id,
178 "Sending notification"
179 );
180 match self.sender.send_item(notification.clone()) {
181 Ok(()) => {
182 self.next_notification_id = notification_id + 1;
183 self.metrics.notifications_sent_total.increment(1);
184 Poll::Ready(Ok(()))
185 }
186 Err(err) => Poll::Ready(Err(err)),
187 }
188 }
189}
190
191#[derive(Metrics)]
193#[metrics(scope = "exex.manager")]
194pub struct ExExManagerMetrics {
195 max_capacity: Gauge,
197 current_capacity: Gauge,
199 buffer_size: Gauge,
203 num_exexs: Gauge,
205}
206
207#[derive(Debug)]
217pub struct ExExManager<P, N: NodePrimitives> {
218 provider: P,
220
221 exex_handles: Vec<ExExHandle<N>>,
223
224 handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification<N>)>,
226
227 min_id: usize,
229 next_id: usize,
231 buffer: VecDeque<(usize, ExExNotification<N>)>,
236 max_capacity: usize,
238 current_capacity: Arc<AtomicUsize>,
242
243 is_ready: watch::Sender<bool>,
245
246 finished_height: watch::Sender<FinishedExExHeight>,
248
249 wal: Wal<N>,
251 finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
253 wal_blocks_warning: usize,
255
256 handle: ExExManagerHandle<N>,
258 metrics: ExExManagerMetrics,
260}
261
262impl<P, N> ExExManager<P, N>
263where
264 N: NodePrimitives,
265{
266 pub fn new(
274 provider: P,
275 handles: Vec<ExExHandle<N>>,
276 max_capacity: usize,
277 wal: Wal<N>,
278 finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
279 ) -> Self {
280 let num_exexs = handles.len();
281
282 let (handle_tx, handle_rx) = mpsc::unbounded_channel();
283 let (is_ready_tx, is_ready_rx) = watch::channel(true);
284 let (finished_height_tx, finished_height_rx) = watch::channel(if num_exexs == 0 {
285 FinishedExExHeight::NoExExs
286 } else {
287 FinishedExExHeight::NotReady
288 });
289
290 let current_capacity = Arc::new(AtomicUsize::new(max_capacity));
291
292 let metrics = ExExManagerMetrics::default();
293 metrics.max_capacity.set(max_capacity as f64);
294 metrics.num_exexs.set(num_exexs as f64);
295
296 Self {
297 provider,
298
299 exex_handles: handles,
300
301 handle_rx,
302
303 min_id: 0,
304 next_id: 0,
305 buffer: VecDeque::with_capacity(max_capacity),
306 max_capacity,
307 current_capacity: Arc::clone(¤t_capacity),
308
309 is_ready: is_ready_tx,
310 finished_height: finished_height_tx,
311
312 wal,
313 finalized_header_stream,
314 wal_blocks_warning: DEFAULT_WAL_BLOCKS_WARNING,
315
316 handle: ExExManagerHandle {
317 exex_tx: handle_tx,
318 num_exexs,
319 is_ready_receiver: is_ready_rx.clone(),
320 is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
321 current_capacity,
322 finished_height: finished_height_rx,
323 },
324 metrics,
325 }
326 }
327
328 pub fn handle(&self) -> ExExManagerHandle<N> {
330 self.handle.clone()
331 }
332
333 pub const fn with_wal_blocks_warning(mut self, threshold: usize) -> Self {
339 self.wal_blocks_warning = threshold;
340 self
341 }
342
343 fn update_capacity(&self) {
346 let capacity = self.max_capacity.saturating_sub(self.buffer.len());
347 self.current_capacity.store(capacity, Ordering::Relaxed);
348 self.metrics.current_capacity.set(capacity as f64);
349 self.metrics.buffer_size.set(self.buffer.len() as f64);
350
351 let _ = self.is_ready.send(capacity > 0);
354 }
355
356 fn push_notification(&mut self, notification: ExExNotification<N>) {
359 let next_id = self.next_id;
360 self.buffer.push_back((next_id, notification));
361 self.next_id += 1;
362 }
363}
364
365impl<P, N> ExExManager<P, N>
366where
367 P: HeaderProvider,
368 N: NodePrimitives,
369{
370 fn finalize_wal(&self, finalized_header: SealedHeader<N::BlockHeader>) -> eyre::Result<()> {
375 debug!(target: "exex::manager", header = ?finalized_header.num_hash(), "Received finalized header");
376
377 let exex_finished_heights = self
379 .exex_handles
380 .iter()
381 .map(|exex_handle| (&exex_handle.id, exex_handle.finished_height))
383 .unique_by(|(_, num_hash)| num_hash.map(|num_hash| num_hash.hash))
385 .map(|(exex_id, num_hash)| {
387 num_hash.map_or(Ok((exex_id, num_hash, false)), |num_hash| {
388 self.provider
389 .is_known(num_hash.hash)
390 .map(|is_canonical| (exex_id, Some(num_hash), is_canonical))
392 })
393 })
394 .collect::<Result<Vec<_>, _>>()?;
396 if exex_finished_heights.iter().all(|(_, _, is_canonical)| *is_canonical) {
397 let lowest_finished_height = exex_finished_heights
401 .iter()
402 .copied()
403 .filter_map(|(_, num_hash, _)| num_hash)
404 .chain([(finalized_header.num_hash())])
405 .min_by_key(|num_hash| num_hash.number)
406 .unwrap();
407
408 self.wal.finalize(lowest_finished_height)?;
409 if self.wal.num_blocks() > self.wal_blocks_warning {
410 warn!(
411 target: "exex::manager",
412 blocks = ?self.wal.num_blocks(),
413 threshold = self.wal_blocks_warning,
414 "WAL contains too many blocks and is not getting cleared. That will lead to increased disk space usage. Check that you emit the FinishedHeight event from your ExExes."
415 );
416 }
417 } else {
418 let unfinalized_exexes = exex_finished_heights
419 .into_iter()
420 .filter_map(|(exex_id, num_hash, is_canonical)| {
421 is_canonical.not().then_some((exex_id, num_hash))
422 })
423 .format_with(", ", |(exex_id, num_hash), f| {
424 f(&format_args!("{exex_id} = {num_hash:?}"))
425 })
426 .to_string();
429 debug!(
430 target: "exex::manager",
431 %unfinalized_exexes,
432 "Not all ExExes are on the canonical chain, can't finalize the WAL"
433 );
434 }
435
436 Ok(())
437 }
438}
439
440impl<P, N> Future for ExExManager<P, N>
441where
442 P: HeaderProvider + Unpin + 'static,
443 N: NodePrimitives,
444{
445 type Output = eyre::Result<()>;
446
447 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
459 let this = self.get_mut();
460
461 for exex in &mut this.exex_handles {
463 while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
464 debug!(target: "exex::manager", exex_id = %exex.id, ?event, "Received event from ExEx");
465 exex.metrics.events_sent_total.increment(1);
466 match event {
467 ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),
468 }
469 }
470 }
471
472 let mut last_finalized_header = None;
474 while let Poll::Ready(finalized_header) = this.finalized_header_stream.poll_next_unpin(cx) {
475 last_finalized_header = finalized_header;
476 }
477 if let Some(header) = last_finalized_header {
478 this.finalize_wal(header)?;
479 }
480
481 while this.buffer.len() < this.max_capacity {
483 if let Poll::Ready(Some((source, notification))) = this.handle_rx.poll_recv(cx) {
484 let committed_tip =
485 notification.committed_chain().map(|chain| chain.tip().number());
486 let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number());
487 debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Received new notification");
488
489 match source {
492 ExExNotificationSource::BlockchainTree => {
493 debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Committing notification to WAL");
494 this.wal.commit(¬ification)?;
495 }
496 ExExNotificationSource::Pipeline => {
497 debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Notification was sent from pipeline, skipping WAL commit");
498 }
499 }
500
501 this.push_notification(notification);
502 continue
503 }
504 break
505 }
506 let buffer_full = this.buffer.len() >= this.max_capacity;
507
508 this.update_capacity();
510
511 let mut min_id = usize::MAX;
513 for idx in (0..this.exex_handles.len()).rev() {
514 let mut exex = this.exex_handles.swap_remove(idx);
515
516 let notification_index = exex
519 .next_notification_id
520 .checked_sub(this.min_id)
521 .expect("exex expected notification ID outside the manager's range");
522 if let Some(notification) = this.buffer.get(notification_index) &&
523 let Poll::Ready(Err(err)) = exex.send(cx, notification)
524 {
525 return Poll::Ready(Err(err.into()))
527 }
528 min_id = min_id.min(exex.next_notification_id);
529 this.exex_handles.push(exex);
530 }
531
532 debug!(target: "exex::manager", %min_id, "Updating lowest notification id in buffer");
534 this.buffer.retain(|&(id, _)| id >= min_id);
535 this.min_id = min_id;
536
537 this.update_capacity();
539
540 if buffer_full && this.buffer.len() < this.max_capacity {
542 debug!(target: "exex::manager", "Buffer has space again, waking up senders");
543 cx.waker().wake_by_ref();
544 }
545
546 let finished_height = this.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
548 exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr)))
549 });
550 if let Ok(finished_height) = finished_height {
551 let _ = this.finished_height.send(FinishedExExHeight::Height(finished_height));
552 }
553
554 Poll::Pending
555 }
556}
557
558#[derive(Debug)]
560pub struct ExExManagerHandle<N: NodePrimitives = EthPrimitives> {
561 exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification<N>)>,
563 num_exexs: usize,
565 is_ready_receiver: watch::Receiver<bool>,
571 is_ready: ReusableBoxFuture<'static, watch::Receiver<bool>>,
574 current_capacity: Arc<AtomicUsize>,
576 finished_height: watch::Receiver<FinishedExExHeight>,
578}
579
580impl<N: NodePrimitives> ExExManagerHandle<N> {
581 pub fn empty() -> Self {
587 let (exex_tx, _) = mpsc::unbounded_channel();
588 let (_, is_ready_rx) = watch::channel(true);
589 let (_, finished_height_rx) = watch::channel(FinishedExExHeight::NoExExs);
590
591 Self {
592 exex_tx,
593 num_exexs: 0,
594 is_ready_receiver: is_ready_rx.clone(),
595 is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
596 current_capacity: Arc::new(AtomicUsize::new(0)),
597 finished_height: finished_height_rx,
598 }
599 }
600
601 pub fn send(
605 &self,
606 source: ExExNotificationSource,
607 notification: ExExNotification<N>,
608 ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
609 self.exex_tx.send((source, notification))
610 }
611
612 pub async fn send_async(
617 &mut self,
618 source: ExExNotificationSource,
619 notification: ExExNotification<N>,
620 ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
621 self.ready().await;
622 self.exex_tx.send((source, notification))
623 }
624
625 pub fn capacity(&self) -> usize {
627 self.current_capacity.load(Ordering::Relaxed)
628 }
629
630 pub fn has_capacity(&self) -> bool {
635 self.capacity() > 0
636 }
637
638 pub const fn has_exexs(&self) -> bool {
640 self.num_exexs > 0
641 }
642
643 pub fn finished_height(&self) -> watch::Receiver<FinishedExExHeight> {
645 self.finished_height.clone()
646 }
647
648 pub async fn ready(&mut self) {
650 poll_fn(|cx| self.poll_ready(cx)).await
651 }
652
653 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
655 let rx = ready!(self.is_ready.poll(cx));
656 self.is_ready.set(make_wait_future(rx));
657 Poll::Ready(())
658 }
659}
660
661async fn make_wait_future(mut rx: watch::Receiver<bool>) -> watch::Receiver<bool> {
663 let _ = rx.wait_for(|ready| *ready).await;
666 rx
667}
668
669impl<N: NodePrimitives> Clone for ExExManagerHandle<N> {
670 fn clone(&self) -> Self {
671 Self {
672 exex_tx: self.exex_tx.clone(),
673 num_exexs: self.num_exexs,
674 is_ready_receiver: self.is_ready_receiver.clone(),
675 is_ready: ReusableBoxFuture::new(make_wait_future(self.is_ready_receiver.clone())),
676 current_capacity: self.current_capacity.clone(),
677 finished_height: self.finished_height.clone(),
678 }
679 }
680}
681
682#[cfg(test)]
683mod tests {
684 use super::*;
685 use crate::wal::WalResult;
686 use alloy_primitives::B256;
687 use futures::{StreamExt, TryStreamExt};
688 use rand::Rng;
689 use reth_db_common::init::init_genesis;
690 use reth_evm_ethereum::EthEvmConfig;
691 use reth_primitives_traits::RecoveredBlock;
692 use reth_provider::{
693 providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockReader,
694 BlockWriter, Chain, DBProvider, DatabaseProviderFactory, TransactionVariant,
695 };
696 use reth_testing_utils::generators::{self, random_block, BlockParams};
697
698 fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
699 let (tx, rx) = watch::channel(None);
700 std::mem::forget(tx);
702 ForkChoiceStream::new(rx)
703 }
704
705 #[tokio::test]
706 async fn test_delivers_events() {
707 let temp_dir = tempfile::tempdir().unwrap();
708 let wal = Wal::new(temp_dir.path()).unwrap();
709
710 let (mut exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
711 "test_exex".to_string(),
712 Default::default(),
713 (),
714 EthEvmConfig::mainnet(),
715 wal.handle(),
716 );
717
718 let event = ExExEvent::FinishedHeight(BlockNumHash::new(42, B256::random()));
720 event_tx.send(event).unwrap();
721 let received_event = exex_handle.receiver.recv().await.unwrap();
722 assert_eq!(received_event, event);
723 }
724
725 #[tokio::test]
726 async fn test_has_exexs() {
727 let temp_dir = tempfile::tempdir().unwrap();
728 let wal = Wal::new(temp_dir.path()).unwrap();
729
730 let (exex_handle_1, _, _) = ExExHandle::new(
731 "test_exex_1".to_string(),
732 Default::default(),
733 (),
734 EthEvmConfig::mainnet(),
735 wal.handle(),
736 );
737
738 assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
739 .handle
740 .has_exexs());
741
742 assert!(ExExManager::new((), vec![exex_handle_1], 0, wal, empty_finalized_header_stream())
743 .handle
744 .has_exexs());
745 }
746
747 #[tokio::test]
748 async fn test_has_capacity() {
749 let temp_dir = tempfile::tempdir().unwrap();
750 let wal = Wal::new(temp_dir.path()).unwrap();
751
752 let (exex_handle_1, _, _) = ExExHandle::new(
753 "test_exex_1".to_string(),
754 Default::default(),
755 (),
756 EthEvmConfig::mainnet(),
757 wal.handle(),
758 );
759
760 assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
761 .handle
762 .has_capacity());
763
764 assert!(ExExManager::new(
765 (),
766 vec![exex_handle_1],
767 10,
768 wal,
769 empty_finalized_header_stream()
770 )
771 .handle
772 .has_capacity());
773 }
774
775 #[test]
776 fn test_push_notification() {
777 let temp_dir = tempfile::tempdir().unwrap();
778 let wal = Wal::new(temp_dir.path()).unwrap();
779
780 let (exex_handle, _, _) = ExExHandle::new(
781 "test_exex".to_string(),
782 Default::default(),
783 (),
784 EthEvmConfig::mainnet(),
785 wal.handle(),
786 );
787
788 let mut exex_manager =
790 ExExManager::new((), vec![exex_handle], 10, wal, empty_finalized_header_stream());
791
792 let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
794 block1.set_hash(B256::new([0x01; 32]));
795 block1.set_block_number(10);
796
797 let notification1 = ExExNotification::ChainCommitted {
798 new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
799 };
800
801 exex_manager.push_notification(notification1.clone());
803
804 assert_eq!(exex_manager.buffer.len(), 1);
806 assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
807 assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
808 assert_eq!(exex_manager.next_id, 1);
809
810 let mut block2: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
812 block2.set_hash(B256::new([0x02; 32]));
813 block2.set_block_number(20);
814
815 let notification2 = ExExNotification::ChainCommitted {
816 new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
817 };
818
819 exex_manager.push_notification(notification2.clone());
820
821 assert_eq!(exex_manager.buffer.len(), 2);
823 assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
824 assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
825 assert_eq!(exex_manager.buffer.get(1).unwrap().0, 1);
826 assert_eq!(exex_manager.buffer.get(1).unwrap().1, notification2);
827 assert_eq!(exex_manager.next_id, 2);
828 }
829
830 #[test]
831 fn test_update_capacity() {
832 let temp_dir = tempfile::tempdir().unwrap();
833 let wal = Wal::new(temp_dir.path()).unwrap();
834
835 let (exex_handle, _, _) = ExExHandle::new(
836 "test_exex".to_string(),
837 Default::default(),
838 (),
839 EthEvmConfig::mainnet(),
840 wal.handle(),
841 );
842
843 let max_capacity = 5;
845 let mut exex_manager = ExExManager::new(
846 (),
847 vec![exex_handle],
848 max_capacity,
849 wal,
850 empty_finalized_header_stream(),
851 );
852
853 let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
855 block1.set_hash(B256::new([0x01; 32]));
856 block1.set_block_number(10);
857
858 let notification1 = ExExNotification::ChainCommitted {
859 new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
860 };
861
862 exex_manager.push_notification(notification1.clone());
863 exex_manager.push_notification(notification1);
864
865 exex_manager.update_capacity();
867
868 assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity - 2);
870
871 exex_manager.buffer.clear();
873 exex_manager.update_capacity();
874
875 assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity);
877 }
878
879 #[tokio::test]
880 async fn test_updates_block_height() {
881 let temp_dir = tempfile::tempdir().unwrap();
882 let wal = Wal::new(temp_dir.path()).unwrap();
883
884 let provider_factory = create_test_provider_factory();
885
886 let (exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
887 "test_exex".to_string(),
888 Default::default(),
889 (),
890 EthEvmConfig::mainnet(),
891 wal.handle(),
892 );
893
894 assert!(exex_handle.finished_height.is_none());
896
897 let block = BlockNumHash::new(42, B256::random());
899 event_tx.send(ExExEvent::FinishedHeight(block)).unwrap();
900
901 let exex_manager = ExExManager::new(
903 provider_factory,
904 vec![exex_handle],
905 10,
906 Wal::new(temp_dir.path()).unwrap(),
907 empty_finalized_header_stream(),
908 );
909
910 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
911
912 let mut pinned_manager = std::pin::pin!(exex_manager);
914 let _ = pinned_manager.as_mut().poll(&mut cx);
915
916 let updated_exex_handle = &pinned_manager.exex_handles[0];
918 assert_eq!(updated_exex_handle.finished_height, Some(block));
919
920 let mut receiver = pinned_manager.handle.finished_height();
922
923 receiver.changed().await.unwrap();
925
926 let finished_height = *receiver.borrow();
928
929 assert_eq!(finished_height, FinishedExExHeight::Height(42));
931 }
932
933 #[tokio::test]
934 async fn test_updates_block_height_lower() {
935 let temp_dir = tempfile::tempdir().unwrap();
936 let wal = Wal::new(temp_dir.path()).unwrap();
937
938 let provider_factory = create_test_provider_factory();
939
940 let (exex_handle1, event_tx1, _) = ExExHandle::new(
942 "test_exex1".to_string(),
943 Default::default(),
944 (),
945 EthEvmConfig::mainnet(),
946 wal.handle(),
947 );
948 let (exex_handle2, event_tx2, _) = ExExHandle::new(
949 "test_exex2".to_string(),
950 Default::default(),
951 (),
952 EthEvmConfig::mainnet(),
953 wal.handle(),
954 );
955
956 let block1 = BlockNumHash::new(42, B256::random());
957 let block2 = BlockNumHash::new(10, B256::random());
958
959 event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
961 event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
962
963 let exex_manager = ExExManager::new(
964 provider_factory,
965 vec![exex_handle1, exex_handle2],
966 10,
967 Wal::new(temp_dir.path()).unwrap(),
968 empty_finalized_header_stream(),
969 );
970
971 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
972
973 let mut pinned_manager = std::pin::pin!(exex_manager);
974
975 let _ = pinned_manager.as_mut().poll(&mut cx);
976
977 let mut receiver = pinned_manager.handle.finished_height();
979
980 receiver.changed().await.unwrap();
982
983 let finished_height = *receiver.borrow();
985
986 assert_eq!(finished_height, FinishedExExHeight::Height(10));
988 }
989
990 #[tokio::test]
991 async fn test_updates_block_height_greater() {
992 let temp_dir = tempfile::tempdir().unwrap();
993 let wal = Wal::new(temp_dir.path()).unwrap();
994
995 let provider_factory = create_test_provider_factory();
996
997 let (exex_handle1, event_tx1, _) = ExExHandle::new(
999 "test_exex1".to_string(),
1000 Default::default(),
1001 (),
1002 EthEvmConfig::mainnet(),
1003 wal.handle(),
1004 );
1005 let (exex_handle2, event_tx2, _) = ExExHandle::new(
1006 "test_exex2".to_string(),
1007 Default::default(),
1008 (),
1009 EthEvmConfig::mainnet(),
1010 wal.handle(),
1011 );
1012
1013 assert!(exex_handle1.finished_height.is_none());
1015
1016 let block1 = BlockNumHash::new(42, B256::random());
1017 let block2 = BlockNumHash::new(100, B256::random());
1018
1019 event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
1021 event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
1022
1023 let exex_manager = ExExManager::new(
1024 provider_factory,
1025 vec![exex_handle1, exex_handle2],
1026 10,
1027 Wal::new(temp_dir.path()).unwrap(),
1028 empty_finalized_header_stream(),
1029 );
1030
1031 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1032
1033 let mut pinned_manager = std::pin::pin!(exex_manager);
1034
1035 let _ = pinned_manager.as_mut().poll(&mut cx);
1036
1037 let mut receiver = pinned_manager.handle.finished_height();
1039
1040 receiver.changed().await.unwrap();
1042
1043 let finished_height = *receiver.borrow();
1045
1046 assert_eq!(finished_height, FinishedExExHeight::Height(42));
1048
1049 }
1053
1054 #[tokio::test]
1055 async fn test_exex_manager_capacity() {
1056 let temp_dir = tempfile::tempdir().unwrap();
1057 let wal = Wal::new(temp_dir.path()).unwrap();
1058
1059 let provider_factory = create_test_provider_factory();
1060
1061 let (exex_handle_1, _, _) = ExExHandle::new(
1062 "test_exex_1".to_string(),
1063 Default::default(),
1064 (),
1065 EthEvmConfig::mainnet(),
1066 wal.handle(),
1067 );
1068
1069 let max_capacity = 2;
1071 let exex_manager = ExExManager::new(
1072 provider_factory,
1073 vec![exex_handle_1],
1074 max_capacity,
1075 Wal::new(temp_dir.path()).unwrap(),
1076 empty_finalized_header_stream(),
1077 );
1078
1079 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1080
1081 let notification = ExExNotification::ChainCommitted {
1083 new: Arc::new(Chain::new(
1084 vec![Default::default()],
1085 Default::default(),
1086 Default::default(),
1087 )),
1088 };
1089
1090 exex_manager
1092 .handle
1093 .exex_tx
1094 .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1095 .unwrap();
1096 exex_manager
1097 .handle
1098 .exex_tx
1099 .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1100 .unwrap();
1101 exex_manager
1102 .handle
1103 .exex_tx
1104 .send((ExExNotificationSource::BlockchainTree, notification))
1105 .unwrap();
1106
1107 let mut pinned_manager = std::pin::pin!(exex_manager);
1109
1110 assert_eq!(pinned_manager.next_id, 0);
1112 assert_eq!(pinned_manager.buffer.len(), 0);
1113
1114 let _ = pinned_manager.as_mut().poll(&mut cx);
1115
1116 assert_eq!(pinned_manager.next_id, 2);
1118 assert_eq!(pinned_manager.buffer.len(), 2);
1119 }
1120
1121 #[tokio::test]
1122 async fn exex_handle_new() {
1123 let provider_factory = create_test_provider_factory();
1124 init_genesis(&provider_factory).unwrap();
1125 let provider = BlockchainProvider::new(provider_factory).unwrap();
1126
1127 let temp_dir = tempfile::tempdir().unwrap();
1128 let wal = Wal::new(temp_dir.path()).unwrap();
1129
1130 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1131 "test_exex".to_string(),
1132 Default::default(),
1133 provider,
1134 EthEvmConfig::mainnet(),
1135 wal.handle(),
1136 );
1137
1138 assert_eq!(exex_handle.id, "test_exex");
1140 assert_eq!(exex_handle.next_notification_id, 0);
1141
1142 let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1144 block1.set_hash(B256::new([0x01; 32]));
1145 block1.set_block_number(10);
1146
1147 let mut block2: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1148 block2.set_hash(B256::new([0x02; 32]));
1149 block2.set_block_number(11);
1150
1151 let notification = ExExNotification::ChainCommitted {
1153 new: Arc::new(Chain::new(
1154 vec![Default::default()],
1155 Default::default(),
1156 Default::default(),
1157 )),
1158 };
1159
1160 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1161
1162 match exex_handle.send(&mut cx, &(22, notification.clone())) {
1164 Poll::Ready(Ok(())) => {
1165 let received_notification = notifications.next().await.unwrap().unwrap();
1166 assert_eq!(received_notification, notification);
1167 }
1168 Poll::Pending => panic!("Notification send is pending"),
1169 Poll::Ready(Err(e)) => panic!("Failed to send notification: {e:?}"),
1170 }
1171
1172 assert_eq!(exex_handle.next_notification_id, 23);
1174 }
1175
1176 #[tokio::test]
1177 async fn test_notification_if_finished_height_gt_chain_tip() {
1178 let provider_factory = create_test_provider_factory();
1179 init_genesis(&provider_factory).unwrap();
1180 let provider = BlockchainProvider::new(provider_factory).unwrap();
1181
1182 let temp_dir = tempfile::tempdir().unwrap();
1183 let wal = Wal::new(temp_dir.path()).unwrap();
1184
1185 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1186 "test_exex".to_string(),
1187 Default::default(),
1188 provider,
1189 EthEvmConfig::mainnet(),
1190 wal.handle(),
1191 );
1192
1193 exex_handle.finished_height = Some(BlockNumHash::new(15, B256::random()));
1195
1196 let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1197 block1.set_hash(B256::new([0x01; 32]));
1198 block1.set_block_number(10);
1199
1200 let notification = ExExNotification::ChainCommitted {
1201 new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
1202 };
1203
1204 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1205
1206 match exex_handle.send(&mut cx, &(22, notification)) {
1208 Poll::Ready(Ok(())) => {
1209 poll_fn(|cx| {
1210 assert!(notifications.poll_next_unpin(cx).is_pending());
1213 Poll::Ready(())
1214 })
1215 .await;
1216 }
1217 Poll::Pending | Poll::Ready(Err(_)) => {
1218 panic!("Notification should not be pending or fail");
1219 }
1220 }
1221
1222 assert_eq!(exex_handle.next_notification_id, 23);
1224 }
1225
1226 #[tokio::test]
1227 async fn test_sends_chain_reorged_notification() {
1228 let provider_factory = create_test_provider_factory();
1229 init_genesis(&provider_factory).unwrap();
1230 let provider = BlockchainProvider::new(provider_factory).unwrap();
1231
1232 let temp_dir = tempfile::tempdir().unwrap();
1233 let wal = Wal::new(temp_dir.path()).unwrap();
1234
1235 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1236 "test_exex".to_string(),
1237 Default::default(),
1238 provider,
1239 EthEvmConfig::mainnet(),
1240 wal.handle(),
1241 );
1242
1243 let notification = ExExNotification::ChainReorged {
1244 old: Arc::new(Chain::default()),
1245 new: Arc::new(Chain::default()),
1246 };
1247
1248 exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1251
1252 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1253
1254 match exex_handle.send(&mut cx, &(22, notification.clone())) {
1256 Poll::Ready(Ok(())) => {
1257 let received_notification = notifications.next().await.unwrap().unwrap();
1258 assert_eq!(received_notification, notification);
1259 }
1260 Poll::Pending | Poll::Ready(Err(_)) => {
1261 panic!("Notification should not be pending or fail")
1262 }
1263 }
1264
1265 assert_eq!(exex_handle.next_notification_id, 23);
1267 }
1268
1269 #[tokio::test]
1270 async fn test_sends_chain_reverted_notification() {
1271 let provider_factory = create_test_provider_factory();
1272 init_genesis(&provider_factory).unwrap();
1273 let provider = BlockchainProvider::new(provider_factory).unwrap();
1274
1275 let temp_dir = tempfile::tempdir().unwrap();
1276 let wal = Wal::new(temp_dir.path()).unwrap();
1277
1278 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1279 "test_exex".to_string(),
1280 Default::default(),
1281 provider,
1282 EthEvmConfig::mainnet(),
1283 wal.handle(),
1284 );
1285
1286 let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };
1287
1288 exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1291
1292 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1293
1294 match exex_handle.send(&mut cx, &(22, notification.clone())) {
1296 Poll::Ready(Ok(())) => {
1297 let received_notification = notifications.next().await.unwrap().unwrap();
1298 assert_eq!(received_notification, notification);
1299 }
1300 Poll::Pending | Poll::Ready(Err(_)) => {
1301 panic!("Notification should not be pending or fail")
1302 }
1303 }
1304
1305 assert_eq!(exex_handle.next_notification_id, 23);
1307 }
1308
1309 #[tokio::test]
1310 async fn test_exex_wal() -> eyre::Result<()> {
1311 reth_tracing::init_test_tracing();
1312
1313 let mut rng = generators::rng();
1314
1315 let provider_factory = create_test_provider_factory();
1316 let genesis_hash = init_genesis(&provider_factory).unwrap();
1317 let genesis_block = provider_factory
1318 .sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash)
1319 .unwrap()
1320 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
1321
1322 let block = random_block(
1323 &mut rng,
1324 genesis_block.number + 1,
1325 BlockParams { parent: Some(genesis_hash), ..Default::default() },
1326 )
1327 .try_recover()
1328 .unwrap();
1329 let provider_rw = provider_factory.database_provider_rw().unwrap();
1330 provider_rw.insert_block(&block).unwrap();
1331 provider_rw.commit().unwrap();
1332
1333 let provider = BlockchainProvider::new(provider_factory).unwrap();
1334
1335 let temp_dir = tempfile::tempdir().unwrap();
1336 let wal = Wal::new(temp_dir.path()).unwrap();
1337
1338 let (exex_handle, events_tx, mut notifications) = ExExHandle::new(
1339 "test_exex".to_string(),
1340 Default::default(),
1341 provider.clone(),
1342 EthEvmConfig::mainnet(),
1343 wal.handle(),
1344 );
1345
1346 let genesis_notification = ExExNotification::ChainCommitted {
1347 new: Arc::new(Chain::new(
1348 vec![genesis_block.clone()],
1349 Default::default(),
1350 Default::default(),
1351 )),
1352 };
1353 let notification = ExExNotification::ChainCommitted {
1354 new: Arc::new(Chain::new(vec![block.clone()], Default::default(), Default::default())),
1355 };
1356
1357 let (finalized_headers_tx, rx) = watch::channel(None);
1358 finalized_headers_tx.send(Some(genesis_block.clone_sealed_header()))?;
1359 let finalized_header_stream = ForkChoiceStream::new(rx);
1360
1361 let mut exex_manager = std::pin::pin!(ExExManager::new(
1362 provider,
1363 vec![exex_handle],
1364 2,
1365 wal,
1366 finalized_header_stream
1367 ));
1368
1369 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1370
1371 exex_manager
1372 .handle()
1373 .send(ExExNotificationSource::Pipeline, genesis_notification.clone())?;
1374 exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?;
1375
1376 assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1377 assert_eq!(
1378 notifications.try_poll_next_unpin(&mut cx)?,
1379 Poll::Ready(Some(genesis_notification))
1380 );
1381 assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1382 assert_eq!(
1383 notifications.try_poll_next_unpin(&mut cx)?,
1384 Poll::Ready(Some(notification.clone()))
1385 );
1386 assert_eq!(
1388 exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1389 std::slice::from_ref(¬ification)
1390 );
1391
1392 finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1393 assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1394 assert_eq!(
1396 exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1397 std::slice::from_ref(¬ification)
1398 );
1399
1400 events_tx
1402 .send(ExExEvent::FinishedHeight((rng.random::<u64>(), rng.random::<B256>()).into()))
1403 .unwrap();
1404
1405 finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1406 assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1407 assert_eq!(
1410 exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1411 std::slice::from_ref(¬ification)
1412 );
1413
1414 events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
1416
1417 finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1418 assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1419 assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None);
1421
1422 Ok(())
1423 }
1424
1425 #[tokio::test]
1426 async fn test_deadlock_manager_wakes_after_buffer_clears() {
1427 let temp_dir = tempfile::tempdir().unwrap();
1431 let wal = Wal::new(temp_dir.path()).unwrap();
1432 let provider_factory = create_test_provider_factory();
1433 init_genesis(&provider_factory).unwrap();
1434 let provider = BlockchainProvider::new(provider_factory.clone()).unwrap();
1435
1436 let (exex_handle, _, mut notifications) = ExExHandle::new(
1438 "test_exex".to_string(),
1439 Default::default(),
1440 provider,
1441 EthEvmConfig::mainnet(),
1442 wal.handle(),
1443 );
1444
1445 let max_capacity = 2;
1446 let exex_manager = ExExManager::new(
1447 provider_factory,
1448 vec![exex_handle],
1449 max_capacity,
1450 wal,
1451 empty_finalized_header_stream(),
1452 );
1453
1454 let manager_handle = exex_manager.handle();
1455
1456 tokio::spawn(async move {
1458 exex_manager.await.ok();
1459 });
1460
1461 let mut rng = generators::rng();
1463 let mut make_notif = |id: u64| {
1464 let block = random_block(&mut rng, id, BlockParams::default()).try_recover().unwrap();
1465 ExExNotification::ChainCommitted {
1466 new: Arc::new(Chain::new(vec![block], Default::default(), Default::default())),
1467 }
1468 };
1469
1470 manager_handle.send(ExExNotificationSource::Pipeline, make_notif(1)).unwrap();
1471
1472 manager_handle.send(ExExNotificationSource::Pipeline, make_notif(100)).unwrap();
1476
1477 let _ = notifications.next().await.unwrap();
1485
1486 let result =
1491 tokio::time::timeout(std::time::Duration::from_secs(1), notifications.next()).await;
1492
1493 assert!(
1494 result.is_ok(),
1495 "Deadlock detected! Manager failed to wake up and process Pending Item #100."
1496 );
1497 }
1498}