1use crate::{
2 wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
3};
4use alloy_consensus::BlockHeader;
5use alloy_eips::BlockNumHash;
6use futures::StreamExt;
7use itertools::Itertools;
8use metrics::Gauge;
9use reth_chain_state::ForkChoiceStream;
10use reth_ethereum_primitives::EthPrimitives;
11use reth_evm::execute::BlockExecutorProvider;
12use reth_metrics::{metrics::Counter, Metrics};
13use reth_node_api::NodePrimitives;
14use reth_primitives_traits::SealedHeader;
15use reth_provider::HeaderProvider;
16use reth_tracing::tracing::{debug, warn};
17use std::{
18 collections::VecDeque,
19 fmt::Debug,
20 future::{poll_fn, Future},
21 ops::Not,
22 pin::Pin,
23 sync::{
24 atomic::{AtomicUsize, Ordering},
25 Arc,
26 },
27 task::{ready, Context, Poll},
28};
29use tokio::sync::{
30 mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender},
31 watch,
32};
33use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
34
35pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024;
40
41pub const WAL_BLOCKS_WARNING: usize = 128;
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum ExExNotificationSource {
53 Pipeline,
55 BlockchainTree,
57}
58
59#[derive(Metrics)]
61#[metrics(scope = "exex")]
62struct ExExMetrics {
63 notifications_sent_total: Counter,
65 events_sent_total: Counter,
67}
68
69#[derive(Debug)]
75pub struct ExExHandle<N: NodePrimitives = EthPrimitives> {
76 id: String,
78 metrics: ExExMetrics,
80 sender: PollSender<ExExNotification<N>>,
82 receiver: UnboundedReceiver<ExExEvent>,
84 next_notification_id: usize,
86 finished_height: Option<BlockNumHash>,
90}
91
92impl<N: NodePrimitives> ExExHandle<N> {
93 pub fn new<P, E: BlockExecutorProvider<Primitives = N>>(
98 id: String,
99 node_head: BlockNumHash,
100 provider: P,
101 executor: E,
102 wal_handle: WalHandle<N>,
103 ) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<P, E>) {
104 let (notification_tx, notification_rx) = mpsc::channel(1);
105 let (event_tx, event_rx) = mpsc::unbounded_channel();
106 let notifications =
107 ExExNotifications::new(node_head, provider, executor, notification_rx, wal_handle);
108
109 (
110 Self {
111 id: id.clone(),
112 metrics: ExExMetrics::new_with_labels(&[("exex", id)]),
113 sender: PollSender::new(notification_tx),
114 receiver: event_rx,
115 next_notification_id: 0,
116 finished_height: None,
117 },
118 event_tx,
119 notifications,
120 )
121 }
122
123 fn send(
128 &mut self,
129 cx: &mut Context<'_>,
130 (notification_id, notification): &(usize, ExExNotification<N>),
131 ) -> Poll<Result<(), PollSendError<ExExNotification<N>>>> {
132 if let Some(finished_height) = self.finished_height {
133 match notification {
134 ExExNotification::ChainCommitted { new } => {
135 if finished_height.number >= new.tip().number() {
139 debug!(
140 target: "exex::manager",
141 exex_id = %self.id,
142 %notification_id,
143 ?finished_height,
144 new_tip = %new.tip().number(),
145 "Skipping notification"
146 );
147
148 self.next_notification_id = notification_id + 1;
149 return Poll::Ready(Ok(()))
150 }
151 }
152 ExExNotification::ChainReorged { .. } | ExExNotification::ChainReverted { .. } => {}
157 }
158 }
159
160 debug!(
161 target: "exex::manager",
162 exex_id = %self.id,
163 %notification_id,
164 "Reserving slot for notification"
165 );
166 match self.sender.poll_reserve(cx) {
167 Poll::Ready(Ok(())) => (),
168 other => return other,
169 }
170
171 debug!(
172 target: "exex::manager",
173 exex_id = %self.id,
174 %notification_id,
175 "Sending notification"
176 );
177 match self.sender.send_item(notification.clone()) {
178 Ok(()) => {
179 self.next_notification_id = notification_id + 1;
180 self.metrics.notifications_sent_total.increment(1);
181 Poll::Ready(Ok(()))
182 }
183 Err(err) => Poll::Ready(Err(err)),
184 }
185 }
186}
187
188#[derive(Metrics)]
190#[metrics(scope = "exex.manager")]
191pub struct ExExManagerMetrics {
192 max_capacity: Gauge,
194 current_capacity: Gauge,
196 buffer_size: Gauge,
200 num_exexs: Gauge,
202}
203
204#[derive(Debug)]
214pub struct ExExManager<P, N: NodePrimitives> {
215 provider: P,
217
218 exex_handles: Vec<ExExHandle<N>>,
220
221 handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification<N>)>,
223
224 min_id: usize,
226 next_id: usize,
228 buffer: VecDeque<(usize, ExExNotification<N>)>,
233 max_capacity: usize,
235 current_capacity: Arc<AtomicUsize>,
239
240 is_ready: watch::Sender<bool>,
242
243 finished_height: watch::Sender<FinishedExExHeight>,
245
246 wal: Wal<N>,
248 finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
250
251 handle: ExExManagerHandle<N>,
253 metrics: ExExManagerMetrics,
255}
256
257impl<P, N> ExExManager<P, N>
258where
259 N: NodePrimitives,
260{
261 pub fn new(
269 provider: P,
270 handles: Vec<ExExHandle<N>>,
271 max_capacity: usize,
272 wal: Wal<N>,
273 finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
274 ) -> Self {
275 let num_exexs = handles.len();
276
277 let (handle_tx, handle_rx) = mpsc::unbounded_channel();
278 let (is_ready_tx, is_ready_rx) = watch::channel(true);
279 let (finished_height_tx, finished_height_rx) = watch::channel(if num_exexs == 0 {
280 FinishedExExHeight::NoExExs
281 } else {
282 FinishedExExHeight::NotReady
283 });
284
285 let current_capacity = Arc::new(AtomicUsize::new(max_capacity));
286
287 let metrics = ExExManagerMetrics::default();
288 metrics.max_capacity.set(max_capacity as f64);
289 metrics.num_exexs.set(num_exexs as f64);
290
291 Self {
292 provider,
293
294 exex_handles: handles,
295
296 handle_rx,
297
298 min_id: 0,
299 next_id: 0,
300 buffer: VecDeque::with_capacity(max_capacity),
301 max_capacity,
302 current_capacity: Arc::clone(¤t_capacity),
303
304 is_ready: is_ready_tx,
305 finished_height: finished_height_tx,
306
307 wal,
308 finalized_header_stream,
309
310 handle: ExExManagerHandle {
311 exex_tx: handle_tx,
312 num_exexs,
313 is_ready_receiver: is_ready_rx.clone(),
314 is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
315 current_capacity,
316 finished_height: finished_height_rx,
317 },
318 metrics,
319 }
320 }
321
322 pub fn handle(&self) -> ExExManagerHandle<N> {
324 self.handle.clone()
325 }
326
327 fn update_capacity(&self) {
330 let capacity = self.max_capacity.saturating_sub(self.buffer.len());
331 self.current_capacity.store(capacity, Ordering::Relaxed);
332 self.metrics.current_capacity.set(capacity as f64);
333 self.metrics.buffer_size.set(self.buffer.len() as f64);
334
335 let _ = self.is_ready.send(capacity > 0);
338 }
339
340 fn push_notification(&mut self, notification: ExExNotification<N>) {
343 let next_id = self.next_id;
344 self.buffer.push_back((next_id, notification));
345 self.next_id += 1;
346 }
347}
348
349impl<P, N> ExExManager<P, N>
350where
351 P: HeaderProvider,
352 N: NodePrimitives,
353{
354 fn finalize_wal(&self, finalized_header: SealedHeader<N::BlockHeader>) -> eyre::Result<()> {
359 debug!(target: "exex::manager", header = ?finalized_header.num_hash(), "Received finalized header");
360
361 let exex_finished_heights = self
363 .exex_handles
364 .iter()
365 .map(|exex_handle| (&exex_handle.id, exex_handle.finished_height))
367 .unique_by(|(_, num_hash)| num_hash.map(|num_hash| num_hash.hash))
369 .map(|(exex_id, num_hash)| {
371 num_hash.map_or(Ok((exex_id, num_hash, false)), |num_hash| {
372 self.provider
373 .is_known(&num_hash.hash)
374 .map(|is_canonical| (exex_id, Some(num_hash), is_canonical))
376 })
377 })
378 .collect::<Result<Vec<_>, _>>()?;
380 if exex_finished_heights.iter().all(|(_, _, is_canonical)| *is_canonical) {
381 let lowest_finished_height = exex_finished_heights
385 .iter()
386 .copied()
387 .filter_map(|(_, num_hash, _)| num_hash)
388 .chain([(finalized_header.num_hash())])
389 .min_by_key(|num_hash| num_hash.number)
390 .unwrap();
391
392 self.wal.finalize(lowest_finished_height)?;
393 if self.wal.num_blocks() > WAL_BLOCKS_WARNING {
394 warn!(
395 target: "exex::manager",
396 blocks = ?self.wal.num_blocks(),
397 "WAL contains too many blocks and is not getting cleared. That will lead to increased disk space usage. Check that you emit the FinishedHeight event from your ExExes."
398 );
399 }
400 } else {
401 let unfinalized_exexes = exex_finished_heights
402 .into_iter()
403 .filter_map(|(exex_id, num_hash, is_canonical)| {
404 is_canonical.not().then_some((exex_id, num_hash))
405 })
406 .format_with(", ", |(exex_id, num_hash), f| {
407 f(&format_args!("{exex_id} = {num_hash:?}"))
408 })
409 .to_string();
412 debug!(
413 target: "exex::manager",
414 %unfinalized_exexes,
415 "Not all ExExes are on the canonical chain, can't finalize the WAL"
416 );
417 }
418
419 Ok(())
420 }
421}
422
423impl<P, N> Future for ExExManager<P, N>
424where
425 P: HeaderProvider + Unpin + 'static,
426 N: NodePrimitives,
427{
428 type Output = eyre::Result<()>;
429
430 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
442 let this = self.get_mut();
443
444 for exex in &mut this.exex_handles {
446 while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
447 debug!(target: "exex::manager", exex_id = %exex.id, ?event, "Received event from ExEx");
448 exex.metrics.events_sent_total.increment(1);
449 match event {
450 ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),
451 }
452 }
453 }
454
455 let mut last_finalized_header = None;
457 while let Poll::Ready(finalized_header) = this.finalized_header_stream.poll_next_unpin(cx) {
458 last_finalized_header = finalized_header;
459 }
460 if let Some(header) = last_finalized_header {
461 this.finalize_wal(header)?;
462 }
463
464 while this.buffer.len() < this.max_capacity {
466 if let Poll::Ready(Some((source, notification))) = this.handle_rx.poll_recv(cx) {
467 let committed_tip =
468 notification.committed_chain().map(|chain| chain.tip().number());
469 let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number());
470 debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Received new notification");
471
472 match source {
475 ExExNotificationSource::BlockchainTree => {
476 debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Committing notification to WAL");
477 this.wal.commit(¬ification)?;
478 }
479 ExExNotificationSource::Pipeline => {
480 debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Notification was sent from pipeline, skipping WAL commit");
481 }
482 }
483
484 this.push_notification(notification);
485 continue
486 }
487 break
488 }
489
490 this.update_capacity();
492
493 let mut min_id = usize::MAX;
495 for idx in (0..this.exex_handles.len()).rev() {
496 let mut exex = this.exex_handles.swap_remove(idx);
497
498 let notification_index = exex
501 .next_notification_id
502 .checked_sub(this.min_id)
503 .expect("exex expected notification ID outside the manager's range");
504 if let Some(notification) = this.buffer.get(notification_index) {
505 if let Poll::Ready(Err(err)) = exex.send(cx, notification) {
506 return Poll::Ready(Err(err.into()))
508 }
509 }
510 min_id = min_id.min(exex.next_notification_id);
511 this.exex_handles.push(exex);
512 }
513
514 debug!(target: "exex::manager", %min_id, "Updating lowest notification id in buffer");
516 this.buffer.retain(|&(id, _)| id >= min_id);
517 this.min_id = min_id;
518
519 this.update_capacity();
521
522 let finished_height = this.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
524 exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr)))
525 });
526 if let Ok(finished_height) = finished_height {
527 let _ = this.finished_height.send(FinishedExExHeight::Height(finished_height));
528 }
529
530 Poll::Pending
531 }
532}
533
534#[derive(Debug)]
536pub struct ExExManagerHandle<N: NodePrimitives = EthPrimitives> {
537 exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification<N>)>,
539 num_exexs: usize,
541 is_ready_receiver: watch::Receiver<bool>,
547 is_ready: ReusableBoxFuture<'static, watch::Receiver<bool>>,
550 current_capacity: Arc<AtomicUsize>,
552 finished_height: watch::Receiver<FinishedExExHeight>,
554}
555
556impl<N: NodePrimitives> ExExManagerHandle<N> {
557 pub fn empty() -> Self {
563 let (exex_tx, _) = mpsc::unbounded_channel();
564 let (_, is_ready_rx) = watch::channel(true);
565 let (_, finished_height_rx) = watch::channel(FinishedExExHeight::NoExExs);
566
567 Self {
568 exex_tx,
569 num_exexs: 0,
570 is_ready_receiver: is_ready_rx.clone(),
571 is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
572 current_capacity: Arc::new(AtomicUsize::new(0)),
573 finished_height: finished_height_rx,
574 }
575 }
576
577 pub fn send(
581 &self,
582 source: ExExNotificationSource,
583 notification: ExExNotification<N>,
584 ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
585 self.exex_tx.send((source, notification))
586 }
587
588 pub async fn send_async(
593 &mut self,
594 source: ExExNotificationSource,
595 notification: ExExNotification<N>,
596 ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
597 self.ready().await;
598 self.exex_tx.send((source, notification))
599 }
600
601 pub fn capacity(&self) -> usize {
603 self.current_capacity.load(Ordering::Relaxed)
604 }
605
606 pub fn has_capacity(&self) -> bool {
611 self.capacity() > 0
612 }
613
614 pub const fn has_exexs(&self) -> bool {
616 self.num_exexs > 0
617 }
618
619 pub fn finished_height(&self) -> watch::Receiver<FinishedExExHeight> {
621 self.finished_height.clone()
622 }
623
624 pub async fn ready(&mut self) {
626 poll_fn(|cx| self.poll_ready(cx)).await
627 }
628
629 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
631 let rx = ready!(self.is_ready.poll(cx));
632 self.is_ready.set(make_wait_future(rx));
633 Poll::Ready(())
634 }
635}
636
637async fn make_wait_future(mut rx: watch::Receiver<bool>) -> watch::Receiver<bool> {
639 let _ = rx.wait_for(|ready| *ready).await;
642 rx
643}
644
645impl<N: NodePrimitives> Clone for ExExManagerHandle<N> {
646 fn clone(&self) -> Self {
647 Self {
648 exex_tx: self.exex_tx.clone(),
649 num_exexs: self.num_exexs,
650 is_ready_receiver: self.is_ready_receiver.clone(),
651 is_ready: ReusableBoxFuture::new(make_wait_future(self.is_ready_receiver.clone())),
652 current_capacity: self.current_capacity.clone(),
653 finished_height: self.finished_height.clone(),
654 }
655 }
656}
657
658#[cfg(test)]
659mod tests {
660 use super::*;
661 use crate::wal::WalResult;
662 use alloy_primitives::B256;
663 use futures::{StreamExt, TryStreamExt};
664 use rand::Rng;
665 use reth_db_common::init::init_genesis;
666 use reth_evm::test_utils::MockExecutorProvider;
667 use reth_evm_ethereum::execute::EthExecutorProvider;
668 use reth_primitives_traits::RecoveredBlock;
669 use reth_provider::{
670 providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockReader,
671 BlockWriter, Chain, DatabaseProviderFactory, StorageLocation, TransactionVariant,
672 };
673 use reth_testing_utils::generators::{self, random_block, BlockParams};
674
675 fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
676 let (tx, rx) = watch::channel(None);
677 std::mem::forget(tx);
679 ForkChoiceStream::new(rx)
680 }
681
682 #[tokio::test]
683 async fn test_delivers_events() {
684 let temp_dir = tempfile::tempdir().unwrap();
685 let wal = Wal::new(temp_dir.path()).unwrap();
686
687 let (mut exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
688 "test_exex".to_string(),
689 Default::default(),
690 (),
691 MockExecutorProvider::default(),
692 wal.handle(),
693 );
694
695 let event = ExExEvent::FinishedHeight(BlockNumHash::new(42, B256::random()));
697 event_tx.send(event).unwrap();
698 let received_event = exex_handle.receiver.recv().await.unwrap();
699 assert_eq!(received_event, event);
700 }
701
702 #[tokio::test]
703 async fn test_has_exexs() {
704 let temp_dir = tempfile::tempdir().unwrap();
705 let wal = Wal::new(temp_dir.path()).unwrap();
706
707 let (exex_handle_1, _, _) = ExExHandle::new(
708 "test_exex_1".to_string(),
709 Default::default(),
710 (),
711 MockExecutorProvider::default(),
712 wal.handle(),
713 );
714
715 assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
716 .handle
717 .has_exexs());
718
719 assert!(ExExManager::new((), vec![exex_handle_1], 0, wal, empty_finalized_header_stream())
720 .handle
721 .has_exexs());
722 }
723
724 #[tokio::test]
725 async fn test_has_capacity() {
726 let temp_dir = tempfile::tempdir().unwrap();
727 let wal = Wal::new(temp_dir.path()).unwrap();
728
729 let (exex_handle_1, _, _) = ExExHandle::new(
730 "test_exex_1".to_string(),
731 Default::default(),
732 (),
733 MockExecutorProvider::default(),
734 wal.handle(),
735 );
736
737 assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
738 .handle
739 .has_capacity());
740
741 assert!(ExExManager::new(
742 (),
743 vec![exex_handle_1],
744 10,
745 wal,
746 empty_finalized_header_stream()
747 )
748 .handle
749 .has_capacity());
750 }
751
752 #[test]
753 fn test_push_notification() {
754 let temp_dir = tempfile::tempdir().unwrap();
755 let wal = Wal::new(temp_dir.path()).unwrap();
756
757 let (exex_handle, _, _) = ExExHandle::new(
758 "test_exex".to_string(),
759 Default::default(),
760 (),
761 MockExecutorProvider::default(),
762 wal.handle(),
763 );
764
765 let mut exex_manager =
767 ExExManager::new((), vec![exex_handle], 10, wal, empty_finalized_header_stream());
768
769 let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
771 block1.set_hash(B256::new([0x01; 32]));
772 block1.set_block_number(10);
773
774 let notification1 = ExExNotification::ChainCommitted {
775 new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
776 };
777
778 exex_manager.push_notification(notification1.clone());
780
781 assert_eq!(exex_manager.buffer.len(), 1);
783 assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
784 assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
785 assert_eq!(exex_manager.next_id, 1);
786
787 let mut block2: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
789 block2.set_hash(B256::new([0x02; 32]));
790 block2.set_block_number(20);
791
792 let notification2 = ExExNotification::ChainCommitted {
793 new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
794 };
795
796 exex_manager.push_notification(notification2.clone());
797
798 assert_eq!(exex_manager.buffer.len(), 2);
800 assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
801 assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
802 assert_eq!(exex_manager.buffer.get(1).unwrap().0, 1);
803 assert_eq!(exex_manager.buffer.get(1).unwrap().1, notification2);
804 assert_eq!(exex_manager.next_id, 2);
805 }
806
807 #[test]
808 fn test_update_capacity() {
809 let temp_dir = tempfile::tempdir().unwrap();
810 let wal = Wal::new(temp_dir.path()).unwrap();
811
812 let (exex_handle, _, _) = ExExHandle::new(
813 "test_exex".to_string(),
814 Default::default(),
815 (),
816 MockExecutorProvider::default(),
817 wal.handle(),
818 );
819
820 let max_capacity = 5;
822 let mut exex_manager = ExExManager::new(
823 (),
824 vec![exex_handle],
825 max_capacity,
826 wal,
827 empty_finalized_header_stream(),
828 );
829
830 let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
832 block1.set_hash(B256::new([0x01; 32]));
833 block1.set_block_number(10);
834
835 let notification1 = ExExNotification::ChainCommitted {
836 new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
837 };
838
839 exex_manager.push_notification(notification1.clone());
840 exex_manager.push_notification(notification1);
841
842 exex_manager.update_capacity();
844
845 assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity - 2);
847
848 exex_manager.buffer.clear();
850 exex_manager.update_capacity();
851
852 assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity);
854 }
855
856 #[tokio::test]
857 async fn test_updates_block_height() {
858 let temp_dir = tempfile::tempdir().unwrap();
859 let wal = Wal::new(temp_dir.path()).unwrap();
860
861 let provider_factory = create_test_provider_factory();
862
863 let (exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
864 "test_exex".to_string(),
865 Default::default(),
866 (),
867 MockExecutorProvider::default(),
868 wal.handle(),
869 );
870
871 assert!(exex_handle.finished_height.is_none());
873
874 let block = BlockNumHash::new(42, B256::random());
876 event_tx.send(ExExEvent::FinishedHeight(block)).unwrap();
877
878 let exex_manager = ExExManager::new(
880 provider_factory,
881 vec![exex_handle],
882 10,
883 Wal::new(temp_dir.path()).unwrap(),
884 empty_finalized_header_stream(),
885 );
886
887 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
888
889 let mut pinned_manager = std::pin::pin!(exex_manager);
891 let _ = pinned_manager.as_mut().poll(&mut cx);
892
893 let updated_exex_handle = &pinned_manager.exex_handles[0];
895 assert_eq!(updated_exex_handle.finished_height, Some(block));
896
897 let mut receiver = pinned_manager.handle.finished_height();
899
900 receiver.changed().await.unwrap();
902
903 let finished_height = *receiver.borrow();
905
906 assert_eq!(finished_height, FinishedExExHeight::Height(42));
908 }
909
910 #[tokio::test]
911 async fn test_updates_block_height_lower() {
912 let temp_dir = tempfile::tempdir().unwrap();
913 let wal = Wal::new(temp_dir.path()).unwrap();
914
915 let provider_factory = create_test_provider_factory();
916
917 let (exex_handle1, event_tx1, _) = ExExHandle::new(
919 "test_exex1".to_string(),
920 Default::default(),
921 (),
922 MockExecutorProvider::default(),
923 wal.handle(),
924 );
925 let (exex_handle2, event_tx2, _) = ExExHandle::new(
926 "test_exex2".to_string(),
927 Default::default(),
928 (),
929 MockExecutorProvider::default(),
930 wal.handle(),
931 );
932
933 let block1 = BlockNumHash::new(42, B256::random());
934 let block2 = BlockNumHash::new(10, B256::random());
935
936 event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
938 event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
939
940 let exex_manager = ExExManager::new(
941 provider_factory,
942 vec![exex_handle1, exex_handle2],
943 10,
944 Wal::new(temp_dir.path()).unwrap(),
945 empty_finalized_header_stream(),
946 );
947
948 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
949
950 let mut pinned_manager = std::pin::pin!(exex_manager);
951
952 let _ = pinned_manager.as_mut().poll(&mut cx);
953
954 let mut receiver = pinned_manager.handle.finished_height();
956
957 receiver.changed().await.unwrap();
959
960 let finished_height = *receiver.borrow();
962
963 assert_eq!(finished_height, FinishedExExHeight::Height(10));
965 }
966
967 #[tokio::test]
968 async fn test_updates_block_height_greater() {
969 let temp_dir = tempfile::tempdir().unwrap();
970 let wal = Wal::new(temp_dir.path()).unwrap();
971
972 let provider_factory = create_test_provider_factory();
973
974 let (exex_handle1, event_tx1, _) = ExExHandle::new(
976 "test_exex1".to_string(),
977 Default::default(),
978 (),
979 MockExecutorProvider::default(),
980 wal.handle(),
981 );
982 let (exex_handle2, event_tx2, _) = ExExHandle::new(
983 "test_exex2".to_string(),
984 Default::default(),
985 (),
986 MockExecutorProvider::default(),
987 wal.handle(),
988 );
989
990 assert!(exex_handle1.finished_height.is_none());
992
993 let block1 = BlockNumHash::new(42, B256::random());
994 let block2 = BlockNumHash::new(100, B256::random());
995
996 event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
998 event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
999
1000 let exex_manager = ExExManager::new(
1001 provider_factory,
1002 vec![exex_handle1, exex_handle2],
1003 10,
1004 Wal::new(temp_dir.path()).unwrap(),
1005 empty_finalized_header_stream(),
1006 );
1007
1008 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1009
1010 let mut pinned_manager = std::pin::pin!(exex_manager);
1011
1012 let _ = pinned_manager.as_mut().poll(&mut cx);
1013
1014 let mut receiver = pinned_manager.handle.finished_height();
1016
1017 receiver.changed().await.unwrap();
1019
1020 let finished_height = *receiver.borrow();
1022
1023 assert_eq!(finished_height, FinishedExExHeight::Height(42));
1025
1026 }
1030
1031 #[tokio::test]
1032 async fn test_exex_manager_capacity() {
1033 let temp_dir = tempfile::tempdir().unwrap();
1034 let wal = Wal::new(temp_dir.path()).unwrap();
1035
1036 let provider_factory = create_test_provider_factory();
1037
1038 let (exex_handle_1, _, _) = ExExHandle::new(
1039 "test_exex_1".to_string(),
1040 Default::default(),
1041 (),
1042 MockExecutorProvider::default(),
1043 wal.handle(),
1044 );
1045
1046 let max_capacity = 2;
1048 let exex_manager = ExExManager::new(
1049 provider_factory,
1050 vec![exex_handle_1],
1051 max_capacity,
1052 Wal::new(temp_dir.path()).unwrap(),
1053 empty_finalized_header_stream(),
1054 );
1055
1056 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1057
1058 let notification = ExExNotification::ChainCommitted {
1060 new: Arc::new(Chain::new(
1061 vec![Default::default()],
1062 Default::default(),
1063 Default::default(),
1064 )),
1065 };
1066
1067 exex_manager
1069 .handle
1070 .exex_tx
1071 .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1072 .unwrap();
1073 exex_manager
1074 .handle
1075 .exex_tx
1076 .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1077 .unwrap();
1078 exex_manager
1079 .handle
1080 .exex_tx
1081 .send((ExExNotificationSource::BlockchainTree, notification))
1082 .unwrap();
1083
1084 let mut pinned_manager = std::pin::pin!(exex_manager);
1086
1087 assert_eq!(pinned_manager.next_id, 0);
1089 assert_eq!(pinned_manager.buffer.len(), 0);
1090
1091 let _ = pinned_manager.as_mut().poll(&mut cx);
1092
1093 assert_eq!(pinned_manager.next_id, 2);
1095 assert_eq!(pinned_manager.buffer.len(), 2);
1096 }
1097
1098 #[tokio::test]
1099 async fn exex_handle_new() {
1100 let provider_factory = create_test_provider_factory();
1101 init_genesis(&provider_factory).unwrap();
1102 let provider = BlockchainProvider::new(provider_factory).unwrap();
1103
1104 let temp_dir = tempfile::tempdir().unwrap();
1105 let wal = Wal::new(temp_dir.path()).unwrap();
1106
1107 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1108 "test_exex".to_string(),
1109 Default::default(),
1110 provider,
1111 EthExecutorProvider::mainnet(),
1112 wal.handle(),
1113 );
1114
1115 assert_eq!(exex_handle.id, "test_exex");
1117 assert_eq!(exex_handle.next_notification_id, 0);
1118
1119 let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1121 block1.set_hash(B256::new([0x01; 32]));
1122 block1.set_block_number(10);
1123
1124 let mut block2: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1125 block2.set_hash(B256::new([0x02; 32]));
1126 block2.set_block_number(11);
1127
1128 let notification = ExExNotification::ChainCommitted {
1130 new: Arc::new(Chain::new(
1131 vec![block1.clone(), block2.clone()],
1132 Default::default(),
1133 Default::default(),
1134 )),
1135 };
1136
1137 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1138
1139 match exex_handle.send(&mut cx, &(22, notification.clone())) {
1141 Poll::Ready(Ok(())) => {
1142 let received_notification = notifications.next().await.unwrap().unwrap();
1143 assert_eq!(received_notification, notification);
1144 }
1145 Poll::Pending => panic!("Notification send is pending"),
1146 Poll::Ready(Err(e)) => panic!("Failed to send notification: {e:?}"),
1147 }
1148
1149 assert_eq!(exex_handle.next_notification_id, 23);
1151 }
1152
1153 #[tokio::test]
1154 async fn test_notification_if_finished_height_gt_chain_tip() {
1155 let provider_factory = create_test_provider_factory();
1156 init_genesis(&provider_factory).unwrap();
1157 let provider = BlockchainProvider::new(provider_factory).unwrap();
1158
1159 let temp_dir = tempfile::tempdir().unwrap();
1160 let wal = Wal::new(temp_dir.path()).unwrap();
1161
1162 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1163 "test_exex".to_string(),
1164 Default::default(),
1165 provider,
1166 EthExecutorProvider::mainnet(),
1167 wal.handle(),
1168 );
1169
1170 exex_handle.finished_height = Some(BlockNumHash::new(15, B256::random()));
1172
1173 let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1174 block1.set_hash(B256::new([0x01; 32]));
1175 block1.set_block_number(10);
1176
1177 let notification = ExExNotification::ChainCommitted {
1178 new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
1179 };
1180
1181 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1182
1183 match exex_handle.send(&mut cx, &(22, notification)) {
1185 Poll::Ready(Ok(())) => {
1186 poll_fn(|cx| {
1187 assert!(notifications.poll_next_unpin(cx).is_pending());
1190 Poll::Ready(())
1191 })
1192 .await;
1193 }
1194 Poll::Pending | Poll::Ready(Err(_)) => {
1195 panic!("Notification should not be pending or fail");
1196 }
1197 }
1198
1199 assert_eq!(exex_handle.next_notification_id, 23);
1201 }
1202
1203 #[tokio::test]
1204 async fn test_sends_chain_reorged_notification() {
1205 let provider_factory = create_test_provider_factory();
1206 init_genesis(&provider_factory).unwrap();
1207 let provider = BlockchainProvider::new(provider_factory).unwrap();
1208
1209 let temp_dir = tempfile::tempdir().unwrap();
1210 let wal = Wal::new(temp_dir.path()).unwrap();
1211
1212 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1213 "test_exex".to_string(),
1214 Default::default(),
1215 provider,
1216 EthExecutorProvider::mainnet(),
1217 wal.handle(),
1218 );
1219
1220 let notification = ExExNotification::ChainReorged {
1221 old: Arc::new(Chain::default()),
1222 new: Arc::new(Chain::default()),
1223 };
1224
1225 exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1228
1229 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1230
1231 match exex_handle.send(&mut cx, &(22, notification.clone())) {
1233 Poll::Ready(Ok(())) => {
1234 let received_notification = notifications.next().await.unwrap().unwrap();
1235 assert_eq!(received_notification, notification);
1236 }
1237 Poll::Pending | Poll::Ready(Err(_)) => {
1238 panic!("Notification should not be pending or fail")
1239 }
1240 }
1241
1242 assert_eq!(exex_handle.next_notification_id, 23);
1244 }
1245
1246 #[tokio::test]
1247 async fn test_sends_chain_reverted_notification() {
1248 let provider_factory = create_test_provider_factory();
1249 init_genesis(&provider_factory).unwrap();
1250 let provider = BlockchainProvider::new(provider_factory).unwrap();
1251
1252 let temp_dir = tempfile::tempdir().unwrap();
1253 let wal = Wal::new(temp_dir.path()).unwrap();
1254
1255 let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1256 "test_exex".to_string(),
1257 Default::default(),
1258 provider,
1259 EthExecutorProvider::mainnet(),
1260 wal.handle(),
1261 );
1262
1263 let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };
1264
1265 exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1268
1269 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1270
1271 match exex_handle.send(&mut cx, &(22, notification.clone())) {
1273 Poll::Ready(Ok(())) => {
1274 let received_notification = notifications.next().await.unwrap().unwrap();
1275 assert_eq!(received_notification, notification);
1276 }
1277 Poll::Pending | Poll::Ready(Err(_)) => {
1278 panic!("Notification should not be pending or fail")
1279 }
1280 }
1281
1282 assert_eq!(exex_handle.next_notification_id, 23);
1284 }
1285
1286 #[tokio::test]
1287 async fn test_exex_wal() -> eyre::Result<()> {
1288 reth_tracing::init_test_tracing();
1289
1290 let mut rng = generators::rng();
1291
1292 let provider_factory = create_test_provider_factory();
1293 let genesis_hash = init_genesis(&provider_factory).unwrap();
1294 let genesis_block = provider_factory
1295 .sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash)
1296 .unwrap()
1297 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
1298
1299 let block = random_block(
1300 &mut rng,
1301 genesis_block.number + 1,
1302 BlockParams { parent: Some(genesis_hash), ..Default::default() },
1303 )
1304 .try_recover()
1305 .unwrap();
1306 let provider_rw = provider_factory.database_provider_rw().unwrap();
1307 provider_rw.insert_block(block.clone(), StorageLocation::Database).unwrap();
1308 provider_rw.commit().unwrap();
1309
1310 let provider = BlockchainProvider::new(provider_factory).unwrap();
1311
1312 let temp_dir = tempfile::tempdir().unwrap();
1313 let wal = Wal::new(temp_dir.path()).unwrap();
1314
1315 let (exex_handle, events_tx, mut notifications) = ExExHandle::new(
1316 "test_exex".to_string(),
1317 Default::default(),
1318 provider.clone(),
1319 EthExecutorProvider::mainnet(),
1320 wal.handle(),
1321 );
1322
1323 let genesis_notification = ExExNotification::ChainCommitted {
1324 new: Arc::new(Chain::new(vec![genesis_block.clone()], Default::default(), None)),
1325 };
1326 let notification = ExExNotification::ChainCommitted {
1327 new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
1328 };
1329
1330 let (finalized_headers_tx, rx) = watch::channel(None);
1331 finalized_headers_tx.send(Some(genesis_block.clone_sealed_header()))?;
1332 let finalized_header_stream = ForkChoiceStream::new(rx);
1333
1334 let mut exex_manager = std::pin::pin!(ExExManager::new(
1335 provider,
1336 vec![exex_handle],
1337 2,
1338 wal,
1339 finalized_header_stream
1340 ));
1341
1342 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1343
1344 exex_manager
1345 .handle()
1346 .send(ExExNotificationSource::Pipeline, genesis_notification.clone())?;
1347 exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?;
1348
1349 assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1350 assert_eq!(
1351 notifications.try_poll_next_unpin(&mut cx)?,
1352 Poll::Ready(Some(genesis_notification))
1353 );
1354 assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1355 assert_eq!(
1356 notifications.try_poll_next_unpin(&mut cx)?,
1357 Poll::Ready(Some(notification.clone()))
1358 );
1359 assert_eq!(
1361 exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1362 [notification.clone()]
1363 );
1364
1365 finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1366 assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1367 assert_eq!(
1369 exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1370 [notification.clone()]
1371 );
1372
1373 events_tx
1375 .send(ExExEvent::FinishedHeight((rng.random::<u64>(), rng.random::<B256>()).into()))
1376 .unwrap();
1377
1378 finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1379 assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1380 assert_eq!(
1383 exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1384 [notification]
1385 );
1386
1387 events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
1389
1390 finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1391 assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1392 assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None);
1394
1395 Ok(())
1396 }
1397}