1use crate::{
2 wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
3};
4use alloy_consensus::BlockHeader;
5use alloy_eips::BlockNumHash;
6use futures::StreamExt;
7use itertools::Itertools;
8use metrics::Gauge;
9use reth_chain_state::ForkChoiceStream;
10use reth_ethereum_primitives::EthPrimitives;
11use reth_evm::ConfigureEvm;
12use reth_metrics::{metrics::Counter, Metrics};
13use reth_node_api::NodePrimitives;
14use reth_primitives_traits::SealedHeader;
15use reth_provider::HeaderProvider;
16use reth_tracing::tracing::{debug, warn};
17use std::{
18 collections::VecDeque,
19 fmt::Debug,
20 future::{poll_fn, Future},
21 ops::Not,
22 pin::Pin,
23 sync::{
24 atomic::{AtomicUsize, Ordering},
25 Arc,
26 },
27 task::{ready, Context, Poll},
28};
29use tokio::sync::{
30 mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender},
31 watch,
32};
33use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
34
35pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024;
40
41pub const DEFAULT_WAL_BLOCKS_WARNING: usize = 128;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum ExExNotificationSource {
56 Pipeline,
58 BlockchainTree,
60}
61
62#[derive(Metrics)]
64#[metrics(scope = "exex")]
65struct ExExMetrics {
66 notifications_sent_total: Counter,
68 events_sent_total: Counter,
70}
71
72#[derive(Debug)]
78pub struct ExExHandle<N: NodePrimitives = EthPrimitives> {
79 id: String,
81 metrics: ExExMetrics,
83 sender: PollSender<ExExNotification<N>>,
85 receiver: UnboundedReceiver<ExExEvent>,
87 next_notification_id: usize,
89 finished_height: Option<BlockNumHash>,
93}
94
95impl<N: NodePrimitives> ExExHandle<N> {
96 pub fn new<P, E: ConfigureEvm<Primitives = N>>(
101 id: String,
102 node_head: BlockNumHash,
103 provider: P,
104 evm_config: E,
105 wal_handle: WalHandle<N>,
106 ) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<P, E>) {
107 let (notification_tx, notification_rx) = mpsc::channel(1);
108 let (event_tx, event_rx) = mpsc::unbounded_channel();
109 let notifications =
110 ExExNotifications::new(node_head, provider, evm_config, notification_rx, wal_handle);
111
112 (
113 Self {
114 id: id.clone(),
115 metrics: ExExMetrics::new_with_labels(&[("exex", id)]),
116 sender: PollSender::new(notification_tx),
117 receiver: event_rx,
118 next_notification_id: 0,
119 finished_height: None,
120 },
121 event_tx,
122 notifications,
123 )
124 }
125
126 fn send(
131 &mut self,
132 cx: &mut Context<'_>,
133 (notification_id, notification): &(usize, ExExNotification<N>),
134 ) -> Poll<Result<(), PollSendError<ExExNotification<N>>>> {
135 if let Some(finished_height) = self.finished_height {
136 match notification {
137 ExExNotification::ChainCommitted { new } => {
138 if finished_height.number >= new.tip().number() {
142 debug!(
143 target: "exex::manager",
144 exex_id = %self.id,
145 %notification_id,
146 ?finished_height,
147 new_tip = %new.tip().number(),
148 "Skipping notification"
149 );
150
151 self.next_notification_id = notification_id + 1;
152 return Poll::Ready(Ok(()))
153 }
154 }
155 ExExNotification::ChainReorged { .. } | ExExNotification::ChainReverted { .. } => {}
160 }
161 }
162
163 debug!(
164 target: "exex::manager",
165 exex_id = %self.id,
166 %notification_id,
167 "Reserving slot for notification"
168 );
169 match self.sender.poll_reserve(cx) {
170 Poll::Ready(Ok(())) => (),
171 other => return other,
172 }
173
174 debug!(
175 target: "exex::manager",
176 exex_id = %self.id,
177 %notification_id,
178 "Sending notification"
179 );
180 match self.sender.send_item(notification.clone()) {
181 Ok(()) => {
182 self.next_notification_id = notification_id + 1;
183 self.metrics.notifications_sent_total.increment(1);
184 Poll::Ready(Ok(()))
185 }
186 Err(err) => Poll::Ready(Err(err)),
187 }
188 }
189}
190
191#[derive(Metrics)]
193#[metrics(scope = "exex.manager")]
194pub struct ExExManagerMetrics {
195 max_capacity: Gauge,
197 current_capacity: Gauge,
199 buffer_size: Gauge,
203 num_exexs: Gauge,
205}
206
207#[derive(Debug)]
217pub struct ExExManager<P, N: NodePrimitives> {
218 provider: P,
220
221 exex_handles: Vec<ExExHandle<N>>,
223
224 handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification<N>)>,
226
227 min_id: usize,
229 next_id: usize,
231 buffer: VecDeque<(usize, ExExNotification<N>)>,
236 max_capacity: usize,
238 current_capacity: Arc<AtomicUsize>,
242
243 is_ready: watch::Sender<bool>,
245
246 finished_height: watch::Sender<FinishedExExHeight>,
248
249 wal: Wal<N>,
251 finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
253 wal_blocks_warning: usize,
255
256 handle: ExExManagerHandle<N>,
258 metrics: ExExManagerMetrics,
260}
261
262impl<P, N> ExExManager<P, N>
263where
264 N: NodePrimitives,
265{
266 pub fn new(
274 provider: P,
275 handles: Vec<ExExHandle<N>>,
276 max_capacity: usize,
277 wal: Wal<N>,
278 finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
279 ) -> Self {
280 let num_exexs = handles.len();
281
282 let (handle_tx, handle_rx) = mpsc::unbounded_channel();
283 let (is_ready_tx, is_ready_rx) = watch::channel(true);
284 let (finished_height_tx, finished_height_rx) = watch::channel(if num_exexs == 0 {
285 FinishedExExHeight::NoExExs
286 } else {
287 FinishedExExHeight::NotReady
288 });
289
290 let current_capacity = Arc::new(AtomicUsize::new(max_capacity));
291
292 let metrics = ExExManagerMetrics::default();
293 metrics.max_capacity.set(max_capacity as f64);
294 metrics.num_exexs.set(num_exexs as f64);
295
296 Self {
297 provider,
298
299 exex_handles: handles,
300
301 handle_rx,
302
303 min_id: 0,
304 next_id: 0,
305 buffer: VecDeque::with_capacity(max_capacity),
306 max_capacity,
307 current_capacity: Arc::clone(¤t_capacity),
308
309 is_ready: is_ready_tx,
310 finished_height: finished_height_tx,
311
312 wal,
313 finalized_header_stream,
314 wal_blocks_warning: DEFAULT_WAL_BLOCKS_WARNING,
315
316 handle: ExExManagerHandle {
317 exex_tx: handle_tx,
318 num_exexs,
319 is_ready_receiver: is_ready_rx.clone(),
320 is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
321 current_capacity,
322 finished_height: finished_height_rx,
323 },
324 metrics,
325 }
326 }
327
328 pub fn handle(&self) -> ExExManagerHandle<N> {
330 self.handle.clone()
331 }
332
333 pub const fn with_wal_blocks_warning(mut self, threshold: usize) -> Self {
339 self.wal_blocks_warning = threshold;
340 self
341 }
342
343 fn update_capacity(&self) {
346 let capacity = self.max_capacity.saturating_sub(self.buffer.len());
347 self.current_capacity.store(capacity, Ordering::Relaxed);
348 self.metrics.current_capacity.set(capacity as f64);
349 self.metrics.buffer_size.set(self.buffer.len() as f64);
350
351 let _ = self.is_ready.send(capacity > 0);
354 }
355
356 fn push_notification(&mut self, notification: ExExNotification<N>) {
359 let next_id = self.next_id;
360 self.buffer.push_back((next_id, notification));
361 self.next_id += 1;
362 }
363}
364
365impl<P, N> ExExManager<P, N>
366where
367 P: HeaderProvider,
368 N: NodePrimitives,
369{
370 fn finalize_wal(&self, finalized_header: SealedHeader<N::BlockHeader>) -> eyre::Result<()> {
375 debug!(target: "exex::manager", header = ?finalized_header.num_hash(), "Received finalized header");
376
377 let exex_finished_heights = self
379 .exex_handles
380 .iter()
381 .map(|exex_handle| (&exex_handle.id, exex_handle.finished_height))
383 .unique_by(|(_, num_hash)| num_hash.map(|num_hash| num_hash.hash))
385 .map(|(exex_id, num_hash)| {
387 num_hash.map_or(Ok((exex_id, num_hash, false)), |num_hash| {
388 self.provider
389 .is_known(num_hash.hash)
390 .map(|is_canonical| (exex_id, Some(num_hash), is_canonical))
392 })
393 })
394 .collect::<Result<Vec<_>, _>>()?;
396 if exex_finished_heights.iter().all(|(_, _, is_canonical)| *is_canonical) {
397 let lowest_finished_height = exex_finished_heights
401 .iter()
402 .copied()
403 .filter_map(|(_, num_hash, _)| num_hash)
404 .chain([(finalized_header.num_hash())])
405 .min_by_key(|num_hash| num_hash.number)
406 .unwrap();
407
408 self.wal.finalize(lowest_finished_height)?;
409 if self.wal.num_blocks() > self.wal_blocks_warning {
410 warn!(
411 target: "exex::manager",
412 blocks = ?self.wal.num_blocks(),
413 threshold = self.wal_blocks_warning,
414 "WAL contains too many blocks and is not getting cleared. That will lead to increased disk space usage. Check that you emit the FinishedHeight event from your ExExes."
415 );
416 }
417 } else {
418 let unfinalized_exexes = exex_finished_heights
419 .into_iter()
420 .filter_map(|(exex_id, num_hash, is_canonical)| {
421 is_canonical.not().then_some((exex_id, num_hash))
422 })
423 .format_with(", ", |(exex_id, num_hash), f| {
424 f(&format_args!("{exex_id} = {num_hash:?}"))
425 })
426 .to_string();
429 debug!(
430 target: "exex::manager",
431 %unfinalized_exexes,
432 "Not all ExExes are on the canonical chain, can't finalize the WAL"
433 );
434 }
435
436 Ok(())
437 }
438}
439
440impl<P, N> Future for ExExManager<P, N>
441where
442 P: HeaderProvider + Unpin + 'static,
443 N: NodePrimitives,
444{
445 type Output = eyre::Result<()>;
446
447 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
459 let this = self.get_mut();
460
461 for exex in &mut this.exex_handles {
463 while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
464 debug!(target: "exex::manager", exex_id = %exex.id, ?event, "Received event from ExEx");
465 exex.metrics.events_sent_total.increment(1);
466 match event {
467 ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),
468 }
469 }
470 }
471
472 let mut last_finalized_header = None;
474 while let Poll::Ready(finalized_header) = this.finalized_header_stream.poll_next_unpin(cx) {
475 last_finalized_header = finalized_header;
476 }
477 if let Some(header) = last_finalized_header {
478 this.finalize_wal(header)?;
479 }
480
481 while this.buffer.len() < this.max_capacity {
483 if let Poll::Ready(Some((source, notification))) = this.handle_rx.poll_recv(cx) {
484 let committed_tip =
485 notification.committed_chain().map(|chain| chain.tip().number());
486 let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number());
487 debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Received new notification");
488
489 match source {
492 ExExNotificationSource::BlockchainTree => {
493 debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Committing notification to WAL");
494 this.wal.commit(¬ification)?;
495 }
496 ExExNotificationSource::Pipeline => {
497 debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Notification was sent from pipeline, skipping WAL commit");
498 }
499 }
500
501 this.push_notification(notification);
502 continue
503 }
504 break
505 }
506 let buffer_full = this.buffer.len() >= this.max_capacity;
507
508 let mut min_id = usize::MAX;
510 for idx in (0..this.exex_handles.len()).rev() {
511 let mut exex = this.exex_handles.swap_remove(idx);
512
513 let notification_index = exex
516 .next_notification_id
517 .checked_sub(this.min_id)
518 .expect("exex expected notification ID outside the manager's range");
519 if let Some(notification) = this.buffer.get(notification_index) &&
520 let Poll::Ready(Err(err)) = exex.send(cx, notification)
521 {
522 return Poll::Ready(Err(err.into()))
524 }
525 min_id = min_id.min(exex.next_notification_id);
526 this.exex_handles.push(exex);
527 }
528
529 debug!(target: "exex::manager", %min_id, "Updating lowest notification id in buffer");
531 this.buffer.retain(|&(id, _)| id >= min_id);
532 this.min_id = min_id;
533
534 this.update_capacity();
536
537 if buffer_full && this.buffer.len() < this.max_capacity {
539 debug!(target: "exex::manager", "Buffer has space again, waking up senders");
540 cx.waker().wake_by_ref();
541 }
542
543 let finished_height = this.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
545 exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr)))
546 });
547 if let Ok(finished_height) = finished_height {
548 let _ = this.finished_height.send(FinishedExExHeight::Height(finished_height));
549 }
550
551 Poll::Pending
552 }
553}
554
555#[derive(Debug)]
557pub struct ExExManagerHandle<N: NodePrimitives = EthPrimitives> {
558 exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification<N>)>,
560 num_exexs: usize,
562 is_ready_receiver: watch::Receiver<bool>,
568 is_ready: ReusableBoxFuture<'static, watch::Receiver<bool>>,
571 current_capacity: Arc<AtomicUsize>,
573 finished_height: watch::Receiver<FinishedExExHeight>,
575}
576
577impl<N: NodePrimitives> ExExManagerHandle<N> {
578 pub fn empty() -> Self {
584 let (exex_tx, _) = mpsc::unbounded_channel();
585 let (_, is_ready_rx) = watch::channel(true);
586 let (_, finished_height_rx) = watch::channel(FinishedExExHeight::NoExExs);
587
588 Self {
589 exex_tx,
590 num_exexs: 0,
591 is_ready_receiver: is_ready_rx.clone(),
592 is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
593 current_capacity: Arc::new(AtomicUsize::new(0)),
594 finished_height: finished_height_rx,
595 }
596 }
597
598 pub fn send(
602 &self,
603 source: ExExNotificationSource,
604 notification: ExExNotification<N>,
605 ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
606 self.exex_tx.send((source, notification))
607 }
608
609 pub async fn send_async(
614 &mut self,
615 source: ExExNotificationSource,
616 notification: ExExNotification<N>,
617 ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
618 self.ready().await;
619 self.exex_tx.send((source, notification))
620 }
621
622 pub fn capacity(&self) -> usize {
624 self.current_capacity.load(Ordering::Relaxed)
625 }
626
627 pub fn has_capacity(&self) -> bool {
632 self.capacity() > 0
633 }
634
635 pub const fn has_exexs(&self) -> bool {
637 self.num_exexs > 0
638 }
639
640 pub fn finished_height(&self) -> watch::Receiver<FinishedExExHeight> {
642 self.finished_height.clone()
643 }
644
645 pub async fn ready(&mut self) {
647 poll_fn(|cx| self.poll_ready(cx)).await
648 }
649
650 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
652 let rx = ready!(self.is_ready.poll(cx));
653 self.is_ready.set(make_wait_future(rx));
654 Poll::Ready(())
655 }
656}
657
658async fn make_wait_future(mut rx: watch::Receiver<bool>) -> watch::Receiver<bool> {
660 let _ = rx.wait_for(|ready| *ready).await;
663 rx
664}
665
666impl<N: NodePrimitives> Clone for ExExManagerHandle<N> {
667 fn clone(&self) -> Self {
668 Self {
669 exex_tx: self.exex_tx.clone(),
670 num_exexs: self.num_exexs,
671 is_ready_receiver: self.is_ready_receiver.clone(),
672 is_ready: ReusableBoxFuture::new(make_wait_future(self.is_ready_receiver.clone())),
673 current_capacity: self.current_capacity.clone(),
674 finished_height: self.finished_height.clone(),
675 }
676 }
677}
678
679#[cfg(test)]
680mod tests {
681 use super::*;
682 use crate::wal::WalResult;
683 use alloy_primitives::B256;
684 use futures::{StreamExt, TryStreamExt};
685 use rand::Rng;
686 use reth_db_common::init::init_genesis;
687 use reth_evm_ethereum::EthEvmConfig;
688 use reth_primitives_traits::RecoveredBlock;
689 use reth_provider::{
690 providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockReader,
691 BlockWriter, Chain, DBProvider, DatabaseProviderFactory, TransactionVariant,
692 };
693 use reth_testing_utils::generators::{self, random_block, BlockParams};
694
695 fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
696 let (tx, rx) = watch::channel(None);
697 std::mem::forget(tx);
699 ForkChoiceStream::new(rx)
700 }
701
702 #[tokio::test]
703 async fn test_delivers_events() {
704 let temp_dir = tempfile::tempdir().unwrap();
705 let wal = Wal::new(temp_dir.path()).unwrap();
706
707 let (mut exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
708 "test_exex".to_string(),
709 Default::default(),
710 (),
711 EthEvmConfig::mainnet(),
712 wal.handle(),
713 );
714
715 let event = ExExEvent::FinishedHeight(BlockNumHash::new(42, B256::random()));
717 event_tx.send(event).unwrap();
718 let received_event = exex_handle.receiver.recv().await.unwrap();
719 assert_eq!(received_event, event);
720 }
721
722 #[tokio::test]
723 async fn test_has_exexs() {
724 let temp_dir = tempfile::tempdir().unwrap();
725 let wal = Wal::new(temp_dir.path()).unwrap();
726
727 let (exex_handle_1, _, _) = ExExHandle::new(
728 "test_exex_1".to_string(),
729 Default::default(),
730 (),
731 EthEvmConfig::mainnet(),
732 wal.handle(),
733 );
734
735 assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
736 .handle
737 .has_exexs());
738
739 assert!(ExExManager::new((), vec![exex_handle_1], 0, wal, empty_finalized_header_stream())
740 .handle
741 .has_exexs());
742 }
743
744 #[tokio::test]
745 async fn test_has_capacity() {
746 let temp_dir = tempfile::tempdir().unwrap();
747 let wal = Wal::new(temp_dir.path()).unwrap();
748
749 let (exex_handle_1, _, _) = ExExHandle::new(
750 "test_exex_1".to_string(),
751 Default::default(),
752 (),
753 EthEvmConfig::mainnet(),
754 wal.handle(),
755 );
756
757 assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
758 .handle
759 .has_capacity());
760
761 assert!(ExExManager::new(
762 (),
763 vec![exex_handle_1],
764 10,
765 wal,
766 empty_finalized_header_stream()
767 )
768 .handle
769 .has_capacity());
770 }
771
772 #[test]
773 fn test_push_notification() {
774 let temp_dir = tempfile::tempdir().unwrap();
775 let wal = Wal::new(temp_dir.path()).unwrap();
776
777 let (exex_handle, _, _) = ExExHandle::new(
778 "test_exex".to_string(),
779 Default::default(),
780 (),
781 EthEvmConfig::mainnet(),
782 wal.handle(),
783 );
784
785 let mut exex_manager =
787 ExExManager::new((), vec![exex_handle], 10, wal, empty_finalized_header_stream());
788
789 let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
791 block1.set_hash(B256::new([0x01; 32]));
792 block1.set_block_number(10);
793
794 let notification1 = ExExNotification::ChainCommitted {
795 new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
796 };
797
798 exex_manager.push_notification(notification1.clone());
800
801 assert_eq!(exex_manager.buffer.len(), 1);
803 assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
804 assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
805 assert_eq!(exex_manager.next_id, 1);
806
807 let mut block2: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
809 block2.set_hash(B256::new([0x02; 32]));
810 block2.set_block_number(20);
811
812 let notification2 = ExExNotification::ChainCommitted {
813 new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
814 };
815
816 exex_manager.push_notification(notification2.clone());
817
818 assert_eq!(exex_manager.buffer.len(), 2);
820 assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
821 assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
822 assert_eq!(exex_manager.buffer.get(1).unwrap().0, 1);
823 assert_eq!(exex_manager.buffer.get(1).unwrap().1, notification2);
824 assert_eq!(exex_manager.next_id, 2);
825 }
826
827 #[test]
828 fn test_update_capacity() {
829 let temp_dir = tempfile::tempdir().unwrap();
830 let wal = Wal::new(temp_dir.path()).unwrap();
831
832 let (exex_handle, _, _) = ExExHandle::new(
833 "test_exex".to_string(),
834 Default::default(),
835 (),
836 EthEvmConfig::mainnet(),
837 wal.handle(),
838 );
839
840 let max_capacity = 5;
842 let mut exex_manager = ExExManager::new(
843 (),
844 vec![exex_handle],
845 max_capacity,
846 wal,
847 empty_finalized_header_stream(),
848 );
849
850 let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
852 block1.set_hash(B256::new([0x01; 32]));
853 block1.set_block_number(10);
854
855 let notification1 = ExExNotification::ChainCommitted {
856 new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
857 };
858
859 exex_manager.push_notification(notification1.clone());
860 exex_manager.push_notification(notification1);
861
862 exex_manager.update_capacity();
864
865 assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity - 2);
867
868 exex_manager.buffer.clear();
870 exex_manager.update_capacity();
871
872 assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity);
874 }
875
876 #[tokio::test]
877 async fn test_updates_block_height() {
878 let temp_dir = tempfile::tempdir().unwrap();
879 let wal = Wal::new(temp_dir.path()).unwrap();
880
881 let provider_factory = create_test_provider_factory();
882
883 let (exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
884 "test_exex".to_string(),
885 Default::default(),
886 (),
887 EthEvmConfig::mainnet(),
888 wal.handle(),
889 );
890
891 assert!(exex_handle.finished_height.is_none());
893
894 let block = BlockNumHash::new(42, B256::random());
896 event_tx.send(ExExEvent::FinishedHeight(block)).unwrap();
897
898 let exex_manager = ExExManager::new(
900 provider_factory,
901 vec![exex_handle],
902 10,
903 Wal::new(temp_dir.path()).unwrap(),
904 empty_finalized_header_stream(),
905 );
906
907 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
908
909 let mut pinned_manager = std::pin::pin!(exex_manager);
911 let _ = pinned_manager.as_mut().poll(&mut cx);
912
913 let updated_exex_handle = &pinned_manager.exex_handles[0];
915 assert_eq!(updated_exex_handle.finished_height, Some(block));
916
917 let mut receiver = pinned_manager.handle.finished_height();
919
920 receiver.changed().await.unwrap();
922
923 let finished_height = *receiver.borrow();
925
926 assert_eq!(finished_height, FinishedExExHeight::Height(42));
928 }
929
930 #[tokio::test]
931 async fn test_updates_block_height_lower() {
932 let temp_dir = tempfile::tempdir().unwrap();
933 let wal = Wal::new(temp_dir.path()).unwrap();
934
935 let provider_factory = create_test_provider_factory();
936
937 let (exex_handle1, event_tx1, _) = ExExHandle::new(
939 "test_exex1".to_string(),
940 Default::default(),
941 (),
942 EthEvmConfig::mainnet(),
943 wal.handle(),
944 );
945 let (exex_handle2, event_tx2, _) = ExExHandle::new(
946 "test_exex2".to_string(),
947 Default::default(),
948 (),
949 EthEvmConfig::mainnet(),
950 wal.handle(),
951 );
952
953 let block1 = BlockNumHash::new(42, B256::random());
954 let block2 = BlockNumHash::new(10, B256::random());
955
956 event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
958 event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
959
960 let exex_manager = ExExManager::new(
961 provider_factory,
962 vec![exex_handle1, exex_handle2],
963 10,
964 Wal::new(temp_dir.path()).unwrap(),
965 empty_finalized_header_stream(),
966 );
967
968 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
969
970 let mut pinned_manager = std::pin::pin!(exex_manager);
971
972 let _ = pinned_manager.as_mut().poll(&mut cx);
973
974 let mut receiver = pinned_manager.handle.finished_height();
976
977 receiver.changed().await.unwrap();
979
980 let finished_height = *receiver.borrow();
982
983 assert_eq!(finished_height, FinishedExExHeight::Height(10));
985 }
986
987 #[tokio::test]
988 async fn test_updates_block_height_greater() {
989 let temp_dir = tempfile::tempdir().unwrap();
990 let wal = Wal::new(temp_dir.path()).unwrap();
991
992 let provider_factory = create_test_provider_factory();
993
994 let (exex_handle1, event_tx1, _) = ExExHandle::new(
996 "test_exex1".to_string(),
997 Default::default(),
998 (),
999 EthEvmConfig::mainnet(),
1000 wal.handle(),
1001 );
1002 let (exex_handle2, event_tx2, _) = ExExHandle::new(
1003 "test_exex2".to_string(),
1004 Default::default(),
1005 (),
1006 EthEvmConfig::mainnet(),
1007 wal.handle(),
1008 );
1009
1010 assert!(exex_handle1.finished_height.is_none());
1012
1013 let block1 = BlockNumHash::new(42, B256::random());
1014 let block2 = BlockNumHash::new(100, B256::random());
1015
1016 event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
1018 event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
1019
1020 let exex_manager = ExExManager::new(
1021 provider_factory,
1022 vec![exex_handle1, exex_handle2],
1023 10,
1024 Wal::new(temp_dir.path()).unwrap(),
1025 empty_finalized_header_stream(),
1026 );
1027
1028 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1029
1030 let mut pinned_manager = std::pin::pin!(exex_manager);
1031
1032 let _ = pinned_manager.as_mut().poll(&mut cx);
1033
1034 let mut receiver = pinned_manager.handle.finished_height();
1036
1037 receiver.changed().await.unwrap();
1039
1040 let finished_height = *receiver.borrow();
1042
1043 assert_eq!(finished_height, FinishedExExHeight::Height(42));
1045
1046 }
1050
1051 #[tokio::test]
1052 async fn test_exex_manager_capacity() {
1053 let temp_dir = tempfile::tempdir().unwrap();
1054 let wal = Wal::new(temp_dir.path()).unwrap();
1055
1056 let provider_factory = create_test_provider_factory();
1057
1058 let (exex_handle_1, _, _) = ExExHandle::new(
1059 "test_exex_1".to_string(),
1060 Default::default(),
1061 (),
1062 EthEvmConfig::mainnet(),
1063 wal.handle(),
1064 );
1065
1066 let max_capacity = 2;
1068 let exex_manager = ExExManager::new(
1069 provider_factory,
1070 vec![exex_handle_1],
1071 max_capacity,
1072 Wal::new(temp_dir.path()).unwrap(),
1073 empty_finalized_header_stream(),
1074 );
1075
1076 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1077
1078 let notification = ExExNotification::ChainCommitted {
1080 new: Arc::new(Chain::new(
1081 vec![Default::default()],
1082 Default::default(),
1083 Default::default(),
1084 )),
1085 };
1086
1087 exex_manager
1089 .handle
1090 .exex_tx
1091 .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1092 .unwrap();
1093 exex_manager
1094 .handle
1095 .exex_tx
1096 .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1097 .unwrap();
1098 exex_manager
1099 .handle
1100 .exex_tx
1101 .send((ExExNotificationSource::BlockchainTree, notification))
1102 .unwrap();
1103
1104 let mut pinned_manager = std::pin::pin!(exex_manager);
1106
1107 assert_eq!(pinned_manager.next_id, 0);
1109 assert_eq!(pinned_manager.buffer.len(), 0);
1110
1111 let _ = pinned_manager.as_mut().poll(&mut cx);
1112
1113 assert_eq!(pinned_manager.next_id, 2);
1115 assert_eq!(pinned_manager.buffer.len(), 2);
1116 }
1117
1118 #[tokio::test]
1119 async fn exex_handle_new() {
1120 let provider_factory = create_test_provider_factory();
1121 init_genesis(&provider_factory).unwrap();
1122 let provider = BlockchainProvider::new(provider_factory).unwrap();
1123
1124 let temp_dir = tempfile::tempdir().unwrap();
1125 let wal = Wal::new(temp_dir.path()).unwrap();
1126
1127 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1128 "test_exex".to_string(),
1129 Default::default(),
1130 provider,
1131 EthEvmConfig::mainnet(),
1132 wal.handle(),
1133 );
1134
1135 assert_eq!(exex_handle.id, "test_exex");
1137 assert_eq!(exex_handle.next_notification_id, 0);
1138
1139 let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1141 block1.set_hash(B256::new([0x01; 32]));
1142 block1.set_block_number(10);
1143
1144 let mut block2: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1145 block2.set_hash(B256::new([0x02; 32]));
1146 block2.set_block_number(11);
1147
1148 let notification = ExExNotification::ChainCommitted {
1150 new: Arc::new(Chain::new(
1151 vec![Default::default()],
1152 Default::default(),
1153 Default::default(),
1154 )),
1155 };
1156
1157 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1158
1159 match exex_handle.send(&mut cx, &(22, notification.clone())) {
1161 Poll::Ready(Ok(())) => {
1162 let received_notification = notifications.next().await.unwrap().unwrap();
1163 assert_eq!(received_notification, notification);
1164 }
1165 Poll::Pending => panic!("Notification send is pending"),
1166 Poll::Ready(Err(e)) => panic!("Failed to send notification: {e:?}"),
1167 }
1168
1169 assert_eq!(exex_handle.next_notification_id, 23);
1171 }
1172
1173 #[tokio::test]
1174 async fn test_notification_if_finished_height_gt_chain_tip() {
1175 let provider_factory = create_test_provider_factory();
1176 init_genesis(&provider_factory).unwrap();
1177 let provider = BlockchainProvider::new(provider_factory).unwrap();
1178
1179 let temp_dir = tempfile::tempdir().unwrap();
1180 let wal = Wal::new(temp_dir.path()).unwrap();
1181
1182 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1183 "test_exex".to_string(),
1184 Default::default(),
1185 provider,
1186 EthEvmConfig::mainnet(),
1187 wal.handle(),
1188 );
1189
1190 exex_handle.finished_height = Some(BlockNumHash::new(15, B256::random()));
1192
1193 let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1194 block1.set_hash(B256::new([0x01; 32]));
1195 block1.set_block_number(10);
1196
1197 let notification = ExExNotification::ChainCommitted {
1198 new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
1199 };
1200
1201 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1202
1203 match exex_handle.send(&mut cx, &(22, notification)) {
1205 Poll::Ready(Ok(())) => {
1206 poll_fn(|cx| {
1207 assert!(notifications.poll_next_unpin(cx).is_pending());
1210 Poll::Ready(())
1211 })
1212 .await;
1213 }
1214 Poll::Pending | Poll::Ready(Err(_)) => {
1215 panic!("Notification should not be pending or fail");
1216 }
1217 }
1218
1219 assert_eq!(exex_handle.next_notification_id, 23);
1221 }
1222
1223 #[tokio::test]
1224 async fn test_sends_chain_reorged_notification() {
1225 let provider_factory = create_test_provider_factory();
1226 init_genesis(&provider_factory).unwrap();
1227 let provider = BlockchainProvider::new(provider_factory).unwrap();
1228
1229 let temp_dir = tempfile::tempdir().unwrap();
1230 let wal = Wal::new(temp_dir.path()).unwrap();
1231
1232 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1233 "test_exex".to_string(),
1234 Default::default(),
1235 provider,
1236 EthEvmConfig::mainnet(),
1237 wal.handle(),
1238 );
1239
1240 let notification = ExExNotification::ChainReorged {
1241 old: Arc::new(Chain::default()),
1242 new: Arc::new(Chain::default()),
1243 };
1244
1245 exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1248
1249 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1250
1251 match exex_handle.send(&mut cx, &(22, notification.clone())) {
1253 Poll::Ready(Ok(())) => {
1254 let received_notification = notifications.next().await.unwrap().unwrap();
1255 assert_eq!(received_notification, notification);
1256 }
1257 Poll::Pending | Poll::Ready(Err(_)) => {
1258 panic!("Notification should not be pending or fail")
1259 }
1260 }
1261
1262 assert_eq!(exex_handle.next_notification_id, 23);
1264 }
1265
1266 #[tokio::test]
1267 async fn test_sends_chain_reverted_notification() {
1268 let provider_factory = create_test_provider_factory();
1269 init_genesis(&provider_factory).unwrap();
1270 let provider = BlockchainProvider::new(provider_factory).unwrap();
1271
1272 let temp_dir = tempfile::tempdir().unwrap();
1273 let wal = Wal::new(temp_dir.path()).unwrap();
1274
1275 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1276 "test_exex".to_string(),
1277 Default::default(),
1278 provider,
1279 EthEvmConfig::mainnet(),
1280 wal.handle(),
1281 );
1282
1283 let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };
1284
1285 exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1288
1289 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1290
1291 match exex_handle.send(&mut cx, &(22, notification.clone())) {
1293 Poll::Ready(Ok(())) => {
1294 let received_notification = notifications.next().await.unwrap().unwrap();
1295 assert_eq!(received_notification, notification);
1296 }
1297 Poll::Pending | Poll::Ready(Err(_)) => {
1298 panic!("Notification should not be pending or fail")
1299 }
1300 }
1301
1302 assert_eq!(exex_handle.next_notification_id, 23);
1304 }
1305
1306 #[tokio::test]
1307 async fn test_exex_wal() -> eyre::Result<()> {
1308 reth_tracing::init_test_tracing();
1309
1310 let mut rng = generators::rng();
1311
1312 let provider_factory = create_test_provider_factory();
1313 let genesis_hash = init_genesis(&provider_factory).unwrap();
1314 let genesis_block = provider_factory
1315 .sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash)
1316 .unwrap()
1317 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
1318
1319 let block = random_block(
1320 &mut rng,
1321 genesis_block.number + 1,
1322 BlockParams { parent: Some(genesis_hash), ..Default::default() },
1323 )
1324 .try_recover()
1325 .unwrap();
1326 let provider_rw = provider_factory.database_provider_rw().unwrap();
1327 provider_rw.insert_block(&block).unwrap();
1328 provider_rw.commit().unwrap();
1329
1330 let provider = BlockchainProvider::new(provider_factory).unwrap();
1331
1332 let temp_dir = tempfile::tempdir().unwrap();
1333 let wal = Wal::new(temp_dir.path()).unwrap();
1334
1335 let (exex_handle, events_tx, mut notifications) = ExExHandle::new(
1336 "test_exex".to_string(),
1337 Default::default(),
1338 provider.clone(),
1339 EthEvmConfig::mainnet(),
1340 wal.handle(),
1341 );
1342
1343 let genesis_notification = ExExNotification::ChainCommitted {
1344 new: Arc::new(Chain::new(
1345 vec![genesis_block.clone()],
1346 Default::default(),
1347 Default::default(),
1348 )),
1349 };
1350 let notification = ExExNotification::ChainCommitted {
1351 new: Arc::new(Chain::new(vec![block.clone()], Default::default(), Default::default())),
1352 };
1353
1354 let (finalized_headers_tx, rx) = watch::channel(None);
1355 finalized_headers_tx.send(Some(genesis_block.clone_sealed_header()))?;
1356 let finalized_header_stream = ForkChoiceStream::new(rx);
1357
1358 let mut exex_manager = std::pin::pin!(ExExManager::new(
1359 provider,
1360 vec![exex_handle],
1361 2,
1362 wal,
1363 finalized_header_stream
1364 ));
1365
1366 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1367
1368 exex_manager
1369 .handle()
1370 .send(ExExNotificationSource::Pipeline, genesis_notification.clone())?;
1371 exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?;
1372
1373 assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1374 assert_eq!(
1375 notifications.try_poll_next_unpin(&mut cx)?,
1376 Poll::Ready(Some(genesis_notification))
1377 );
1378 assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1379 assert_eq!(
1380 notifications.try_poll_next_unpin(&mut cx)?,
1381 Poll::Ready(Some(notification.clone()))
1382 );
1383 assert_eq!(
1385 exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1386 std::slice::from_ref(¬ification)
1387 );
1388
1389 finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1390 assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1391 assert_eq!(
1393 exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1394 std::slice::from_ref(¬ification)
1395 );
1396
1397 events_tx
1399 .send(ExExEvent::FinishedHeight((rng.random::<u64>(), rng.random::<B256>()).into()))
1400 .unwrap();
1401
1402 finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1403 assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1404 assert_eq!(
1407 exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1408 std::slice::from_ref(¬ification)
1409 );
1410
1411 events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
1413
1414 finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1415 assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1416 assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None);
1418
1419 Ok(())
1420 }
1421
1422 #[tokio::test]
1423 async fn test_deadlock_manager_wakes_after_buffer_clears() {
1424 let temp_dir = tempfile::tempdir().unwrap();
1428 let wal = Wal::new(temp_dir.path()).unwrap();
1429 let provider_factory = create_test_provider_factory();
1430 init_genesis(&provider_factory).unwrap();
1431 let provider = BlockchainProvider::new(provider_factory.clone()).unwrap();
1432
1433 let (exex_handle, _, mut notifications) = ExExHandle::new(
1435 "test_exex".to_string(),
1436 Default::default(),
1437 provider,
1438 EthEvmConfig::mainnet(),
1439 wal.handle(),
1440 );
1441
1442 let max_capacity = 2;
1443 let exex_manager = ExExManager::new(
1444 provider_factory,
1445 vec![exex_handle],
1446 max_capacity,
1447 wal,
1448 empty_finalized_header_stream(),
1449 );
1450
1451 let manager_handle = exex_manager.handle();
1452
1453 tokio::spawn(async move {
1455 exex_manager.await.ok();
1456 });
1457
1458 let mut rng = generators::rng();
1460 let mut make_notif = |id: u64| {
1461 let block = random_block(&mut rng, id, BlockParams::default()).try_recover().unwrap();
1462 ExExNotification::ChainCommitted {
1463 new: Arc::new(Chain::new(vec![block], Default::default(), Default::default())),
1464 }
1465 };
1466
1467 manager_handle.send(ExExNotificationSource::Pipeline, make_notif(1)).unwrap();
1468
1469 manager_handle.send(ExExNotificationSource::Pipeline, make_notif(100)).unwrap();
1473
1474 let _ = notifications.next().await.unwrap();
1482
1483 let result =
1488 tokio::time::timeout(std::time::Duration::from_secs(1), notifications.next()).await;
1489
1490 assert!(
1491 result.is_ok(),
1492 "Deadlock detected! Manager failed to wake up and process Pending Item #100."
1493 );
1494 }
1495}