Skip to main content

reth_exex/
manager.rs

1use crate::{
2    wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
3};
4use alloy_consensus::BlockHeader;
5use alloy_eips::BlockNumHash;
6use futures::StreamExt;
7use itertools::Itertools;
8use metrics::Gauge;
9use reth_chain_state::ForkChoiceStream;
10use reth_ethereum_primitives::EthPrimitives;
11use reth_evm::ConfigureEvm;
12use reth_metrics::{metrics::Counter, Metrics};
13use reth_node_api::NodePrimitives;
14use reth_primitives_traits::SealedHeader;
15use reth_provider::HeaderProvider;
16use reth_tracing::tracing::{debug, warn};
17use std::{
18    collections::VecDeque,
19    fmt::Debug,
20    future::{poll_fn, Future},
21    ops::Not,
22    pin::Pin,
23    sync::{
24        atomic::{AtomicUsize, Ordering},
25        Arc,
26    },
27    task::{ready, Context, Poll},
28};
29use tokio::sync::{
30    mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender},
31    watch,
32};
33use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
34
35/// Default max size of the internal state notifications buffer.
36///
37/// 1024 notifications in the buffer is 3.5 hours of mainnet blocks,
38/// or 17 minutes of 1-second blocks.
39pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024;
40
41/// Default maximum number of blocks allowed in the WAL before emitting a warning.
42///
43/// This constant defines the default threshold for the Write-Ahead Log (WAL) size. If the number
44/// of blocks in the WAL exceeds this limit, a warning is logged to indicate potential issues.
45///
46/// This value is appropriate for Ethereum mainnet with ~12 second block times. For L2 chains with
47/// faster block times, this value should be increased proportionally to avoid excessive warnings.
48pub const DEFAULT_WAL_BLOCKS_WARNING: usize = 128;
49
50/// The source of the notification.
51///
52/// This distinguishment is needed to not commit any pipeline notificatations to [WAL](`Wal`),
53/// because they are already finalized.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum ExExNotificationSource {
56    /// The notification was sent from the pipeline.
57    Pipeline,
58    /// The notification was sent from the blockchain tree.
59    BlockchainTree,
60}
61
62/// Metrics for an `ExEx`.
63#[derive(Metrics)]
64#[metrics(scope = "exex")]
65struct ExExMetrics {
66    /// The total number of notifications sent to an `ExEx`.
67    notifications_sent_total: Counter,
68    /// The total number of events an `ExEx` has sent to the manager.
69    events_sent_total: Counter,
70}
71
72/// A handle to an `ExEx` used by the [`ExExManager`] to communicate with `ExEx`'s.
73///
74/// A handle should be created for each `ExEx` with a unique ID. The channels returned by
75/// [`ExExHandle::new`] should be given to the `ExEx`, while the handle itself should be given to
76/// the manager in [`ExExManager::new`].
77#[derive(Debug)]
78pub struct ExExHandle<N: NodePrimitives = EthPrimitives> {
79    /// The execution extension's ID.
80    id: String,
81    /// Metrics for an `ExEx`.
82    metrics: ExExMetrics,
83    /// Channel to send [`ExExNotification`]s to the `ExEx`.
84    sender: PollSender<ExExNotification<N>>,
85    /// Channel to receive [`ExExEvent`]s from the `ExEx`.
86    receiver: UnboundedReceiver<ExExEvent>,
87    /// The ID of the next notification to send to this `ExEx`.
88    next_notification_id: usize,
89    /// The finished block of the `ExEx`.
90    ///
91    /// If this is `None`, the `ExEx` has not emitted a `FinishedHeight` event.
92    finished_height: Option<BlockNumHash>,
93}
94
95impl<N: NodePrimitives> ExExHandle<N> {
96    /// Create a new handle for the given `ExEx`.
97    ///
98    /// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a
99    /// [`mpsc::Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`.
100    pub fn new<P, E: ConfigureEvm<Primitives = N>>(
101        id: String,
102        node_head: BlockNumHash,
103        provider: P,
104        evm_config: E,
105        wal_handle: WalHandle<N>,
106    ) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<P, E>) {
107        let (notification_tx, notification_rx) = mpsc::channel(1);
108        let (event_tx, event_rx) = mpsc::unbounded_channel();
109        let notifications =
110            ExExNotifications::new(node_head, provider, evm_config, notification_rx, wal_handle);
111
112        (
113            Self {
114                id: id.clone(),
115                metrics: ExExMetrics::new_with_labels(&[("exex", id)]),
116                sender: PollSender::new(notification_tx),
117                receiver: event_rx,
118                next_notification_id: 0,
119                finished_height: None,
120            },
121            event_tx,
122            notifications,
123        )
124    }
125
126    /// Reserves a slot in the `PollSender` channel and sends the notification if the slot was
127    /// successfully reserved.
128    ///
129    /// When the notification is sent, it is considered delivered.
130    fn send(
131        &mut self,
132        cx: &mut Context<'_>,
133        (notification_id, notification): &(usize, ExExNotification<N>),
134    ) -> Poll<Result<(), PollSendError<ExExNotification<N>>>> {
135        if let Some(finished_height) = self.finished_height {
136            match notification {
137                ExExNotification::ChainCommitted { new } => {
138                    // Skip the chain commit notification if the finished height of the ExEx is
139                    // higher than or equal to the tip of the new notification.
140                    // I.e., the ExEx has already processed the notification.
141                    if finished_height.number >= new.tip().number() {
142                        debug!(
143                            target: "exex::manager",
144                            exex_id = %self.id,
145                            %notification_id,
146                            ?finished_height,
147                            new_tip = %new.tip().number(),
148                            "Skipping notification"
149                        );
150
151                        self.next_notification_id = notification_id + 1;
152                        return Poll::Ready(Ok(()))
153                    }
154                }
155                // Do not handle [ExExNotification::ChainReorged] and
156                // [ExExNotification::ChainReverted] cases and always send the
157                // notification, because the ExEx should be aware of the reorgs and reverts lower
158                // than its finished height
159                ExExNotification::ChainReorged { .. } | ExExNotification::ChainReverted { .. } => {}
160            }
161        }
162
163        debug!(
164            target: "exex::manager",
165            exex_id = %self.id,
166            %notification_id,
167            "Reserving slot for notification"
168        );
169        match self.sender.poll_reserve(cx) {
170            Poll::Ready(Ok(())) => (),
171            other => return other,
172        }
173
174        debug!(
175            target: "exex::manager",
176            exex_id = %self.id,
177            %notification_id,
178            "Sending notification"
179        );
180        match self.sender.send_item(notification.clone()) {
181            Ok(()) => {
182                self.next_notification_id = notification_id + 1;
183                self.metrics.notifications_sent_total.increment(1);
184                Poll::Ready(Ok(()))
185            }
186            Err(err) => Poll::Ready(Err(err)),
187        }
188    }
189}
190
191/// Metrics for the `ExEx` manager.
192#[derive(Metrics)]
193#[metrics(scope = "exex.manager")]
194pub struct ExExManagerMetrics {
195    /// Max size of the internal state notifications buffer.
196    max_capacity: Gauge,
197    /// Current capacity of the internal state notifications buffer.
198    current_capacity: Gauge,
199    /// Current size of the internal state notifications buffer.
200    ///
201    /// Note that this might be slightly bigger than the maximum capacity in some cases.
202    buffer_size: Gauge,
203    /// Current number of `ExEx`'s on the node.
204    num_exexs: Gauge,
205}
206
207/// The execution extension manager.
208///
209/// The manager is responsible for:
210///
211/// - Receiving relevant events from the rest of the node, and sending these to the execution
212///   extensions
213/// - Backpressure
214/// - Error handling
215/// - Monitoring
216#[derive(Debug)]
217pub struct ExExManager<P, N: NodePrimitives> {
218    /// Provider for querying headers.
219    provider: P,
220
221    /// Handles to communicate with the `ExEx`'s.
222    exex_handles: Vec<ExExHandle<N>>,
223
224    /// [`ExExNotification`] channel from the [`ExExManagerHandle`]s.
225    handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification<N>)>,
226
227    /// The minimum notification ID currently present in the buffer.
228    min_id: usize,
229    /// Monotonically increasing ID for [`ExExNotification`]s.
230    next_id: usize,
231    /// Internal buffer of [`ExExNotification`]s.
232    ///
233    /// The first element of the tuple is a monotonically increasing ID unique to the notification
234    /// (the second element of the tuple).
235    buffer: VecDeque<(usize, ExExNotification<N>)>,
236    /// Max size of the internal state notifications buffer.
237    max_capacity: usize,
238    /// Current state notifications buffer capacity.
239    ///
240    /// Used to inform the execution stage of possible batch sizes.
241    current_capacity: Arc<AtomicUsize>,
242
243    /// Whether the manager is ready to receive new notifications.
244    is_ready: watch::Sender<bool>,
245
246    /// The finished height of all `ExEx`'s.
247    finished_height: watch::Sender<FinishedExExHeight>,
248
249    /// Write-Ahead Log for the [`ExExNotification`]s.
250    wal: Wal<N>,
251    /// A stream of finalized headers.
252    finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
253    /// The threshold for the number of blocks in the WAL before emitting a warning.
254    wal_blocks_warning: usize,
255
256    /// A handle to the `ExEx` manager.
257    handle: ExExManagerHandle<N>,
258    /// Metrics for the `ExEx` manager.
259    metrics: ExExManagerMetrics,
260}
261
262impl<P, N> ExExManager<P, N>
263where
264    N: NodePrimitives,
265{
266    /// Create a new [`ExExManager`].
267    ///
268    /// You must provide an [`ExExHandle`] for each `ExEx` and the maximum capacity of the
269    /// notification buffer in the manager.
270    ///
271    /// When the capacity is exceeded (which can happen if an `ExEx` is slow) no one can send
272    /// notifications over [`ExExManagerHandle`]s until there is capacity again.
273    pub fn new(
274        provider: P,
275        handles: Vec<ExExHandle<N>>,
276        max_capacity: usize,
277        wal: Wal<N>,
278        finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
279    ) -> Self {
280        let num_exexs = handles.len();
281
282        let (handle_tx, handle_rx) = mpsc::unbounded_channel();
283        let (is_ready_tx, is_ready_rx) = watch::channel(true);
284        let (finished_height_tx, finished_height_rx) = watch::channel(if num_exexs == 0 {
285            FinishedExExHeight::NoExExs
286        } else {
287            FinishedExExHeight::NotReady
288        });
289
290        let current_capacity = Arc::new(AtomicUsize::new(max_capacity));
291
292        let metrics = ExExManagerMetrics::default();
293        metrics.max_capacity.set(max_capacity as f64);
294        metrics.num_exexs.set(num_exexs as f64);
295
296        Self {
297            provider,
298
299            exex_handles: handles,
300
301            handle_rx,
302
303            min_id: 0,
304            next_id: 0,
305            buffer: VecDeque::with_capacity(max_capacity),
306            max_capacity,
307            current_capacity: Arc::clone(&current_capacity),
308
309            is_ready: is_ready_tx,
310            finished_height: finished_height_tx,
311
312            wal,
313            finalized_header_stream,
314            wal_blocks_warning: DEFAULT_WAL_BLOCKS_WARNING,
315
316            handle: ExExManagerHandle {
317                exex_tx: handle_tx,
318                num_exexs,
319                is_ready_receiver: is_ready_rx.clone(),
320                is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
321                current_capacity,
322                finished_height: finished_height_rx,
323            },
324            metrics,
325        }
326    }
327
328    /// Returns the handle to the manager.
329    pub fn handle(&self) -> ExExManagerHandle<N> {
330        self.handle.clone()
331    }
332
333    /// Sets the threshold for the number of blocks in the WAL before emitting a warning.
334    ///
335    /// For L2 chains with faster block times, this value should be increased proportionally
336    /// to avoid excessive warnings. For example, a chain with 2-second block times might use
337    /// a value 6x higher than the default.
338    pub const fn with_wal_blocks_warning(mut self, threshold: usize) -> Self {
339        self.wal_blocks_warning = threshold;
340        self
341    }
342
343    /// Updates the current buffer capacity and notifies all `is_ready` watchers of the manager's
344    /// readiness to receive notifications.
345    fn update_capacity(&self) {
346        let capacity = self.max_capacity.saturating_sub(self.buffer.len());
347        self.current_capacity.store(capacity, Ordering::Relaxed);
348        self.metrics.current_capacity.set(capacity as f64);
349        self.metrics.buffer_size.set(self.buffer.len() as f64);
350
351        // we can safely ignore if the channel is closed, since the manager always holds it open
352        // internally
353        let _ = self.is_ready.send(capacity > 0);
354    }
355
356    /// Pushes a new notification into the managers internal buffer, assigning the notification a
357    /// unique ID.
358    fn push_notification(&mut self, notification: ExExNotification<N>) {
359        let next_id = self.next_id;
360        self.buffer.push_back((next_id, notification));
361        self.next_id += 1;
362    }
363}
364
365impl<P, N> ExExManager<P, N>
366where
367    P: HeaderProvider,
368    N: NodePrimitives,
369{
370    /// Finalizes the WAL according to the passed finalized header.
371    ///
372    /// This function checks if all ExExes are on the canonical chain and finalizes the WAL if
373    /// necessary.
374    fn finalize_wal(&self, finalized_header: SealedHeader<N::BlockHeader>) -> eyre::Result<()> {
375        debug!(target: "exex::manager", header = ?finalized_header.num_hash(), "Received finalized header");
376
377        // Check if all ExExes are on the canonical chain
378        let exex_finished_heights = self
379            .exex_handles
380            .iter()
381            // Get ID and finished height for each ExEx
382            .map(|exex_handle| (&exex_handle.id, exex_handle.finished_height))
383            // Deduplicate all hashes
384            .unique_by(|(_, num_hash)| num_hash.map(|num_hash| num_hash.hash))
385            // Check if hashes are canonical
386            .map(|(exex_id, num_hash)| {
387                num_hash.map_or(Ok((exex_id, num_hash, false)), |num_hash| {
388                    self.provider
389                        .is_known(num_hash.hash)
390                        // Save the ExEx ID, finished height, and whether the hash is canonical
391                        .map(|is_canonical| (exex_id, Some(num_hash), is_canonical))
392                })
393            })
394            // We collect here to be able to log the unfinalized ExExes below
395            .collect::<Result<Vec<_>, _>>()?;
396        if exex_finished_heights.iter().all(|(_, _, is_canonical)| *is_canonical) {
397            // If there is a finalized header and all ExExs are on the canonical chain, finalize
398            // the WAL with either the lowest finished height among all ExExes, or finalized header
399            // – whichever is lower.
400            let lowest_finished_height = exex_finished_heights
401                .iter()
402                .copied()
403                .filter_map(|(_, num_hash, _)| num_hash)
404                .chain([(finalized_header.num_hash())])
405                .min_by_key(|num_hash| num_hash.number)
406                .unwrap();
407
408            self.wal.finalize(lowest_finished_height)?;
409            if self.wal.num_blocks() > self.wal_blocks_warning {
410                warn!(
411                    target: "exex::manager",
412                    blocks = ?self.wal.num_blocks(),
413                    threshold = self.wal_blocks_warning,
414                    "WAL contains too many blocks and is not getting cleared. That will lead to increased disk space usage. Check that you emit the FinishedHeight event from your ExExes."
415                );
416            }
417        } else {
418            let unfinalized_exexes = exex_finished_heights
419                .into_iter()
420                .filter_map(|(exex_id, num_hash, is_canonical)| {
421                    is_canonical.not().then_some((exex_id, num_hash))
422                })
423                .format_with(", ", |(exex_id, num_hash), f| {
424                    f(&format_args!("{exex_id} = {num_hash:?}"))
425                })
426                // We need this because `debug!` uses the argument twice when formatting the final
427                // log message, but the result of `format_with` can only be used once
428                .to_string();
429            debug!(
430                target: "exex::manager",
431                %unfinalized_exexes,
432                "Not all ExExes are on the canonical chain, can't finalize the WAL"
433            );
434        }
435
436        Ok(())
437    }
438}
439
440impl<P, N> Future for ExExManager<P, N>
441where
442    P: HeaderProvider + Unpin + 'static,
443    N: NodePrimitives,
444{
445    type Output = eyre::Result<()>;
446
447    /// Main loop of the [`ExExManager`]. The order of operations is as follows:
448    /// 1. Handle incoming ExEx events. We do it before finalizing the WAL, because it depends on
449    ///    the latest state of [`ExExEvent::FinishedHeight`] events.
450    /// 2. Finalize the WAL with the finalized header, if necessary.
451    /// 3. Drain [`ExExManagerHandle`] notifications, push them to the internal buffer and update
452    ///    the internal buffer capacity.
453    /// 5. Send notifications from the internal buffer to those ExExes that are ready to receive new
454    ///    notifications.
455    /// 5. Remove notifications from the internal buffer that have been sent to **all** ExExes and
456    ///    update the internal buffer capacity.
457    /// 6. Update the channel with the lowest [`FinishedExExHeight`] among all ExExes.
458    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
459        let this = self.get_mut();
460
461        // Handle incoming ExEx events
462        for exex in &mut this.exex_handles {
463            while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
464                debug!(target: "exex::manager", exex_id = %exex.id, ?event, "Received event from ExEx");
465                exex.metrics.events_sent_total.increment(1);
466                match event {
467                    ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),
468                }
469            }
470        }
471
472        // Drain the finalized header stream and finalize the WAL with the last header
473        let mut last_finalized_header = None;
474        while let Poll::Ready(finalized_header) = this.finalized_header_stream.poll_next_unpin(cx) {
475            last_finalized_header = finalized_header;
476        }
477        if let Some(header) = last_finalized_header {
478            this.finalize_wal(header)?;
479        }
480
481        // Drain handle notifications
482        while this.buffer.len() < this.max_capacity {
483            if let Poll::Ready(Some((source, notification))) = this.handle_rx.poll_recv(cx) {
484                let committed_tip =
485                    notification.committed_chain().map(|chain| chain.tip().number());
486                let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number());
487                debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Received new notification");
488
489                // Commit to WAL only notifications from blockchain tree. Pipeline notifications
490                // always contain only finalized blocks.
491                match source {
492                    ExExNotificationSource::BlockchainTree => {
493                        debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Committing notification to WAL");
494                        this.wal.commit(&notification)?;
495                    }
496                    ExExNotificationSource::Pipeline => {
497                        debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Notification was sent from pipeline, skipping WAL commit");
498                    }
499                }
500
501                this.push_notification(notification);
502                continue
503            }
504            break
505        }
506        let buffer_full = this.buffer.len() >= this.max_capacity;
507
508        // Advance all poll senders
509        let mut min_id = usize::MAX;
510        for idx in (0..this.exex_handles.len()).rev() {
511            let mut exex = this.exex_handles.swap_remove(idx);
512
513            // It is a logic error for this to ever underflow since the manager manages the
514            // notification IDs
515            let notification_index = exex
516                .next_notification_id
517                .checked_sub(this.min_id)
518                .expect("exex expected notification ID outside the manager's range");
519            if let Some(notification) = this.buffer.get(notification_index) &&
520                let Poll::Ready(Err(err)) = exex.send(cx, notification)
521            {
522                // The channel was closed, which is irrecoverable for the manager
523                return Poll::Ready(Err(err.into()))
524            }
525            min_id = min_id.min(exex.next_notification_id);
526            this.exex_handles.push(exex);
527        }
528
529        // Remove processed buffered notifications
530        debug!(target: "exex::manager", %min_id, "Updating lowest notification id in buffer");
531        this.buffer.retain(|&(id, _)| id >= min_id);
532        this.min_id = min_id;
533
534        // Update capacity
535        this.update_capacity();
536
537        // If the buffer was full and we made space, we need to wake up to accept new notifications
538        if buffer_full && this.buffer.len() < this.max_capacity {
539            debug!(target: "exex::manager", "Buffer has space again, waking up senders");
540            cx.waker().wake_by_ref();
541        }
542
543        // Update watch channel block number
544        let finished_height = this.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
545            exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr)))
546        });
547        if let Ok(finished_height) = finished_height {
548            let _ = this.finished_height.send(FinishedExExHeight::Height(finished_height));
549        }
550
551        Poll::Pending
552    }
553}
554
555/// A handle to communicate with the [`ExExManager`].
556#[derive(Debug)]
557pub struct ExExManagerHandle<N: NodePrimitives = EthPrimitives> {
558    /// Channel to send notifications to the `ExEx` manager.
559    exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification<N>)>,
560    /// The number of `ExEx`'s running on the node.
561    num_exexs: usize,
562    /// A watch channel denoting whether the manager is ready for new notifications or not.
563    ///
564    /// This is stored internally alongside a `ReusableBoxFuture` representation of the same value.
565    /// This field is only used to create a new `ReusableBoxFuture` when the handle is cloned,
566    /// but is otherwise unused.
567    is_ready_receiver: watch::Receiver<bool>,
568    /// A reusable future that resolves when the manager is ready for new
569    /// notifications.
570    is_ready: ReusableBoxFuture<'static, watch::Receiver<bool>>,
571    /// The current capacity of the manager's internal notification buffer.
572    current_capacity: Arc<AtomicUsize>,
573    /// The finished height of all `ExEx`'s.
574    finished_height: watch::Receiver<FinishedExExHeight>,
575}
576
577impl<N: NodePrimitives> ExExManagerHandle<N> {
578    /// Creates an empty manager handle.
579    ///
580    /// Use this if there is no manager present.
581    ///
582    /// The handle will always be ready, and have a capacity of 0.
583    pub fn empty() -> Self {
584        let (exex_tx, _) = mpsc::unbounded_channel();
585        let (_, is_ready_rx) = watch::channel(true);
586        let (_, finished_height_rx) = watch::channel(FinishedExExHeight::NoExExs);
587
588        Self {
589            exex_tx,
590            num_exexs: 0,
591            is_ready_receiver: is_ready_rx.clone(),
592            is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
593            current_capacity: Arc::new(AtomicUsize::new(0)),
594            finished_height: finished_height_rx,
595        }
596    }
597
598    /// Synchronously send a notification over the channel to all execution extensions.
599    ///
600    /// Senders should call [`Self::has_capacity`] first.
601    pub fn send(
602        &self,
603        source: ExExNotificationSource,
604        notification: ExExNotification<N>,
605    ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
606        self.exex_tx.send((source, notification))
607    }
608
609    /// Asynchronously send a notification over the channel to all execution extensions.
610    ///
611    /// The returned future resolves when the notification has been delivered. If there is no
612    /// capacity in the channel, the future will wait.
613    pub async fn send_async(
614        &mut self,
615        source: ExExNotificationSource,
616        notification: ExExNotification<N>,
617    ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
618        self.ready().await;
619        self.exex_tx.send((source, notification))
620    }
621
622    /// Get the current capacity of the `ExEx` manager's internal notification buffer.
623    pub fn capacity(&self) -> usize {
624        self.current_capacity.load(Ordering::Relaxed)
625    }
626
627    /// Whether there is capacity in the `ExEx` manager's internal notification buffer.
628    ///
629    /// If this returns `false`, the owner of the handle should **NOT** send new notifications over
630    /// the channel until the manager is ready again, as this can lead to unbounded memory growth.
631    pub fn has_capacity(&self) -> bool {
632        self.capacity() > 0
633    }
634
635    /// Returns `true` if there are `ExEx`'s installed in the node.
636    pub const fn has_exexs(&self) -> bool {
637        self.num_exexs > 0
638    }
639
640    /// The finished height of all `ExEx`'s.
641    pub fn finished_height(&self) -> watch::Receiver<FinishedExExHeight> {
642        self.finished_height.clone()
643    }
644
645    /// Wait until the manager is ready for new notifications.
646    pub async fn ready(&mut self) {
647        poll_fn(|cx| self.poll_ready(cx)).await
648    }
649
650    /// Wait until the manager is ready for new notifications.
651    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
652        let rx = ready!(self.is_ready.poll(cx));
653        self.is_ready.set(make_wait_future(rx));
654        Poll::Ready(())
655    }
656}
657
658/// Creates a future that resolves once the given watch channel receiver is true.
659async fn make_wait_future(mut rx: watch::Receiver<bool>) -> watch::Receiver<bool> {
660    // NOTE(onbjerg): We can ignore the error here, because if the channel is closed, the node
661    // is shutting down.
662    let _ = rx.wait_for(|ready| *ready).await;
663    rx
664}
665
666impl<N: NodePrimitives> Clone for ExExManagerHandle<N> {
667    fn clone(&self) -> Self {
668        Self {
669            exex_tx: self.exex_tx.clone(),
670            num_exexs: self.num_exexs,
671            is_ready_receiver: self.is_ready_receiver.clone(),
672            is_ready: ReusableBoxFuture::new(make_wait_future(self.is_ready_receiver.clone())),
673            current_capacity: self.current_capacity.clone(),
674            finished_height: self.finished_height.clone(),
675        }
676    }
677}
678
679#[cfg(test)]
680mod tests {
681    use super::*;
682    use crate::wal::WalResult;
683    use alloy_primitives::B256;
684    use futures::{StreamExt, TryStreamExt};
685    use rand::Rng;
686    use reth_db_common::init::init_genesis;
687    use reth_evm_ethereum::EthEvmConfig;
688    use reth_primitives_traits::RecoveredBlock;
689    use reth_provider::{
690        providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockReader,
691        BlockWriter, Chain, DBProvider, DatabaseProviderFactory, TransactionVariant,
692    };
693    use reth_testing_utils::generators::{self, random_block, BlockParams};
694
695    fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
696        let (tx, rx) = watch::channel(None);
697        // Do not drop the sender, otherwise the receiver will always return an error
698        std::mem::forget(tx);
699        ForkChoiceStream::new(rx)
700    }
701
702    #[tokio::test]
703    async fn test_delivers_events() {
704        let temp_dir = tempfile::tempdir().unwrap();
705        let wal = Wal::new(temp_dir.path()).unwrap();
706
707        let (mut exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
708            "test_exex".to_string(),
709            Default::default(),
710            (),
711            EthEvmConfig::mainnet(),
712            wal.handle(),
713        );
714
715        // Send an event and check that it's delivered correctly
716        let event = ExExEvent::FinishedHeight(BlockNumHash::new(42, B256::random()));
717        event_tx.send(event).unwrap();
718        let received_event = exex_handle.receiver.recv().await.unwrap();
719        assert_eq!(received_event, event);
720    }
721
722    #[tokio::test]
723    async fn test_has_exexs() {
724        let temp_dir = tempfile::tempdir().unwrap();
725        let wal = Wal::new(temp_dir.path()).unwrap();
726
727        let (exex_handle_1, _, _) = ExExHandle::new(
728            "test_exex_1".to_string(),
729            Default::default(),
730            (),
731            EthEvmConfig::mainnet(),
732            wal.handle(),
733        );
734
735        assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
736            .handle
737            .has_exexs());
738
739        assert!(ExExManager::new((), vec![exex_handle_1], 0, wal, empty_finalized_header_stream())
740            .handle
741            .has_exexs());
742    }
743
744    #[tokio::test]
745    async fn test_has_capacity() {
746        let temp_dir = tempfile::tempdir().unwrap();
747        let wal = Wal::new(temp_dir.path()).unwrap();
748
749        let (exex_handle_1, _, _) = ExExHandle::new(
750            "test_exex_1".to_string(),
751            Default::default(),
752            (),
753            EthEvmConfig::mainnet(),
754            wal.handle(),
755        );
756
757        assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
758            .handle
759            .has_capacity());
760
761        assert!(ExExManager::new(
762            (),
763            vec![exex_handle_1],
764            10,
765            wal,
766            empty_finalized_header_stream()
767        )
768        .handle
769        .has_capacity());
770    }
771
772    #[test]
773    fn test_push_notification() {
774        let temp_dir = tempfile::tempdir().unwrap();
775        let wal = Wal::new(temp_dir.path()).unwrap();
776
777        let (exex_handle, _, _) = ExExHandle::new(
778            "test_exex".to_string(),
779            Default::default(),
780            (),
781            EthEvmConfig::mainnet(),
782            wal.handle(),
783        );
784
785        // Create a mock ExExManager and add the exex_handle to it
786        let mut exex_manager =
787            ExExManager::new((), vec![exex_handle], 10, wal, empty_finalized_header_stream());
788
789        // Define the notification for testing
790        let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
791        block1.set_hash(B256::new([0x01; 32]));
792        block1.set_block_number(10);
793
794        let notification1 = ExExNotification::ChainCommitted {
795            new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
796        };
797
798        // Push the first notification
799        exex_manager.push_notification(notification1.clone());
800
801        // Verify the buffer contains the notification with the correct ID
802        assert_eq!(exex_manager.buffer.len(), 1);
803        assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
804        assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
805        assert_eq!(exex_manager.next_id, 1);
806
807        // Push another notification
808        let mut block2: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
809        block2.set_hash(B256::new([0x02; 32]));
810        block2.set_block_number(20);
811
812        let notification2 = ExExNotification::ChainCommitted {
813            new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
814        };
815
816        exex_manager.push_notification(notification2.clone());
817
818        // Verify the buffer contains both notifications with correct IDs
819        assert_eq!(exex_manager.buffer.len(), 2);
820        assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
821        assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
822        assert_eq!(exex_manager.buffer.get(1).unwrap().0, 1);
823        assert_eq!(exex_manager.buffer.get(1).unwrap().1, notification2);
824        assert_eq!(exex_manager.next_id, 2);
825    }
826
827    #[test]
828    fn test_update_capacity() {
829        let temp_dir = tempfile::tempdir().unwrap();
830        let wal = Wal::new(temp_dir.path()).unwrap();
831
832        let (exex_handle, _, _) = ExExHandle::new(
833            "test_exex".to_string(),
834            Default::default(),
835            (),
836            EthEvmConfig::mainnet(),
837            wal.handle(),
838        );
839
840        // Create a mock ExExManager and add the exex_handle to it
841        let max_capacity = 5;
842        let mut exex_manager = ExExManager::new(
843            (),
844            vec![exex_handle],
845            max_capacity,
846            wal,
847            empty_finalized_header_stream(),
848        );
849
850        // Push some notifications to fill part of the buffer
851        let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
852        block1.set_hash(B256::new([0x01; 32]));
853        block1.set_block_number(10);
854
855        let notification1 = ExExNotification::ChainCommitted {
856            new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
857        };
858
859        exex_manager.push_notification(notification1.clone());
860        exex_manager.push_notification(notification1);
861
862        // Update capacity
863        exex_manager.update_capacity();
864
865        // Verify current capacity and metrics
866        assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity - 2);
867
868        // Clear the buffer and update capacity
869        exex_manager.buffer.clear();
870        exex_manager.update_capacity();
871
872        // Verify current capacity
873        assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity);
874    }
875
876    #[tokio::test]
877    async fn test_updates_block_height() {
878        let temp_dir = tempfile::tempdir().unwrap();
879        let wal = Wal::new(temp_dir.path()).unwrap();
880
881        let provider_factory = create_test_provider_factory();
882
883        let (exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
884            "test_exex".to_string(),
885            Default::default(),
886            (),
887            EthEvmConfig::mainnet(),
888            wal.handle(),
889        );
890
891        // Check initial block height
892        assert!(exex_handle.finished_height.is_none());
893
894        // Update the block height via an event
895        let block = BlockNumHash::new(42, B256::random());
896        event_tx.send(ExExEvent::FinishedHeight(block)).unwrap();
897
898        // Create a mock ExExManager and add the exex_handle to it
899        let exex_manager = ExExManager::new(
900            provider_factory,
901            vec![exex_handle],
902            10,
903            Wal::new(temp_dir.path()).unwrap(),
904            empty_finalized_header_stream(),
905        );
906
907        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
908
909        // Pin the ExExManager to call the poll method
910        let mut pinned_manager = std::pin::pin!(exex_manager);
911        let _ = pinned_manager.as_mut().poll(&mut cx);
912
913        // Check that the block height was updated
914        let updated_exex_handle = &pinned_manager.exex_handles[0];
915        assert_eq!(updated_exex_handle.finished_height, Some(block));
916
917        // Get the receiver for the finished height
918        let mut receiver = pinned_manager.handle.finished_height();
919
920        // Wait for a new value to be sent
921        receiver.changed().await.unwrap();
922
923        // Get the latest value
924        let finished_height = *receiver.borrow();
925
926        // The finished height should be updated to the lower block height
927        assert_eq!(finished_height, FinishedExExHeight::Height(42));
928    }
929
930    #[tokio::test]
931    async fn test_updates_block_height_lower() {
932        let temp_dir = tempfile::tempdir().unwrap();
933        let wal = Wal::new(temp_dir.path()).unwrap();
934
935        let provider_factory = create_test_provider_factory();
936
937        // Create two `ExExHandle` instances
938        let (exex_handle1, event_tx1, _) = ExExHandle::new(
939            "test_exex1".to_string(),
940            Default::default(),
941            (),
942            EthEvmConfig::mainnet(),
943            wal.handle(),
944        );
945        let (exex_handle2, event_tx2, _) = ExExHandle::new(
946            "test_exex2".to_string(),
947            Default::default(),
948            (),
949            EthEvmConfig::mainnet(),
950            wal.handle(),
951        );
952
953        let block1 = BlockNumHash::new(42, B256::random());
954        let block2 = BlockNumHash::new(10, B256::random());
955
956        // Send events to update the block heights of the two handles, with the second being lower
957        event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
958        event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
959
960        let exex_manager = ExExManager::new(
961            provider_factory,
962            vec![exex_handle1, exex_handle2],
963            10,
964            Wal::new(temp_dir.path()).unwrap(),
965            empty_finalized_header_stream(),
966        );
967
968        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
969
970        let mut pinned_manager = std::pin::pin!(exex_manager);
971
972        let _ = pinned_manager.as_mut().poll(&mut cx);
973
974        // Get the receiver for the finished height
975        let mut receiver = pinned_manager.handle.finished_height();
976
977        // Wait for a new value to be sent
978        receiver.changed().await.unwrap();
979
980        // Get the latest value
981        let finished_height = *receiver.borrow();
982
983        // The finished height should be updated to the lower block height
984        assert_eq!(finished_height, FinishedExExHeight::Height(10));
985    }
986
987    #[tokio::test]
988    async fn test_updates_block_height_greater() {
989        let temp_dir = tempfile::tempdir().unwrap();
990        let wal = Wal::new(temp_dir.path()).unwrap();
991
992        let provider_factory = create_test_provider_factory();
993
994        // Create two `ExExHandle` instances
995        let (exex_handle1, event_tx1, _) = ExExHandle::new(
996            "test_exex1".to_string(),
997            Default::default(),
998            (),
999            EthEvmConfig::mainnet(),
1000            wal.handle(),
1001        );
1002        let (exex_handle2, event_tx2, _) = ExExHandle::new(
1003            "test_exex2".to_string(),
1004            Default::default(),
1005            (),
1006            EthEvmConfig::mainnet(),
1007            wal.handle(),
1008        );
1009
1010        // Assert that the initial block height is `None` for the first `ExExHandle`.
1011        assert!(exex_handle1.finished_height.is_none());
1012
1013        let block1 = BlockNumHash::new(42, B256::random());
1014        let block2 = BlockNumHash::new(100, B256::random());
1015
1016        // Send events to update the block heights of the two handles, with the second being higher.
1017        event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
1018        event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
1019
1020        let exex_manager = ExExManager::new(
1021            provider_factory,
1022            vec![exex_handle1, exex_handle2],
1023            10,
1024            Wal::new(temp_dir.path()).unwrap(),
1025            empty_finalized_header_stream(),
1026        );
1027
1028        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1029
1030        let mut pinned_manager = std::pin::pin!(exex_manager);
1031
1032        let _ = pinned_manager.as_mut().poll(&mut cx);
1033
1034        // Get the receiver for the finished height
1035        let mut receiver = pinned_manager.handle.finished_height();
1036
1037        // Wait for a new value to be sent
1038        receiver.changed().await.unwrap();
1039
1040        // Get the latest value
1041        let finished_height = *receiver.borrow();
1042
1043        // The finished height should be updated to the lower block height
1044        assert_eq!(finished_height, FinishedExExHeight::Height(42));
1045
1046        // // The lower block height should be retained
1047        // let updated_exex_handle = &pinned_manager.exex_handles[0];
1048        // assert_eq!(updated_exex_handle.finished_height, Some(42));
1049    }
1050
1051    #[tokio::test]
1052    async fn test_exex_manager_capacity() {
1053        let temp_dir = tempfile::tempdir().unwrap();
1054        let wal = Wal::new(temp_dir.path()).unwrap();
1055
1056        let provider_factory = create_test_provider_factory();
1057
1058        let (exex_handle_1, _, _) = ExExHandle::new(
1059            "test_exex_1".to_string(),
1060            Default::default(),
1061            (),
1062            EthEvmConfig::mainnet(),
1063            wal.handle(),
1064        );
1065
1066        // Create an ExExManager with a small max capacity
1067        let max_capacity = 2;
1068        let exex_manager = ExExManager::new(
1069            provider_factory,
1070            vec![exex_handle_1],
1071            max_capacity,
1072            Wal::new(temp_dir.path()).unwrap(),
1073            empty_finalized_header_stream(),
1074        );
1075
1076        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1077
1078        // Setup a notification
1079        let notification = ExExNotification::ChainCommitted {
1080            new: Arc::new(Chain::new(
1081                vec![Default::default()],
1082                Default::default(),
1083                Default::default(),
1084            )),
1085        };
1086
1087        // Send notifications to go over the max capacity
1088        exex_manager
1089            .handle
1090            .exex_tx
1091            .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1092            .unwrap();
1093        exex_manager
1094            .handle
1095            .exex_tx
1096            .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1097            .unwrap();
1098        exex_manager
1099            .handle
1100            .exex_tx
1101            .send((ExExNotificationSource::BlockchainTree, notification))
1102            .unwrap();
1103
1104        // Pin the ExExManager to call the poll method
1105        let mut pinned_manager = std::pin::pin!(exex_manager);
1106
1107        // Before polling, the next notification ID should be 0 and the buffer should be empty
1108        assert_eq!(pinned_manager.next_id, 0);
1109        assert_eq!(pinned_manager.buffer.len(), 0);
1110
1111        let _ = pinned_manager.as_mut().poll(&mut cx);
1112
1113        // After polling, the next notification ID and buffer size should be updated
1114        assert_eq!(pinned_manager.next_id, 2);
1115        assert_eq!(pinned_manager.buffer.len(), 2);
1116    }
1117
1118    #[tokio::test]
1119    async fn exex_handle_new() {
1120        let provider_factory = create_test_provider_factory();
1121        init_genesis(&provider_factory).unwrap();
1122        let provider = BlockchainProvider::new(provider_factory).unwrap();
1123
1124        let temp_dir = tempfile::tempdir().unwrap();
1125        let wal = Wal::new(temp_dir.path()).unwrap();
1126
1127        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1128            "test_exex".to_string(),
1129            Default::default(),
1130            provider,
1131            EthEvmConfig::mainnet(),
1132            wal.handle(),
1133        );
1134
1135        // Check initial state
1136        assert_eq!(exex_handle.id, "test_exex");
1137        assert_eq!(exex_handle.next_notification_id, 0);
1138
1139        // Setup two blocks for the chain commit notification
1140        let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1141        block1.set_hash(B256::new([0x01; 32]));
1142        block1.set_block_number(10);
1143
1144        let mut block2: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1145        block2.set_hash(B256::new([0x02; 32]));
1146        block2.set_block_number(11);
1147
1148        // Setup a notification
1149        let notification = ExExNotification::ChainCommitted {
1150            new: Arc::new(Chain::new(
1151                vec![Default::default()],
1152                Default::default(),
1153                Default::default(),
1154            )),
1155        };
1156
1157        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1158
1159        // Send a notification and ensure it's received correctly
1160        match exex_handle.send(&mut cx, &(22, notification.clone())) {
1161            Poll::Ready(Ok(())) => {
1162                let received_notification = notifications.next().await.unwrap().unwrap();
1163                assert_eq!(received_notification, notification);
1164            }
1165            Poll::Pending => panic!("Notification send is pending"),
1166            Poll::Ready(Err(e)) => panic!("Failed to send notification: {e:?}"),
1167        }
1168
1169        // Ensure the notification ID was incremented
1170        assert_eq!(exex_handle.next_notification_id, 23);
1171    }
1172
1173    #[tokio::test]
1174    async fn test_notification_if_finished_height_gt_chain_tip() {
1175        let provider_factory = create_test_provider_factory();
1176        init_genesis(&provider_factory).unwrap();
1177        let provider = BlockchainProvider::new(provider_factory).unwrap();
1178
1179        let temp_dir = tempfile::tempdir().unwrap();
1180        let wal = Wal::new(temp_dir.path()).unwrap();
1181
1182        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1183            "test_exex".to_string(),
1184            Default::default(),
1185            provider,
1186            EthEvmConfig::mainnet(),
1187            wal.handle(),
1188        );
1189
1190        // Set finished_height to a value higher than the block tip
1191        exex_handle.finished_height = Some(BlockNumHash::new(15, B256::random()));
1192
1193        let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1194        block1.set_hash(B256::new([0x01; 32]));
1195        block1.set_block_number(10);
1196
1197        let notification = ExExNotification::ChainCommitted {
1198            new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
1199        };
1200
1201        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1202
1203        // Send the notification
1204        match exex_handle.send(&mut cx, &(22, notification)) {
1205            Poll::Ready(Ok(())) => {
1206                poll_fn(|cx| {
1207                    // The notification should be skipped, so nothing should be sent.
1208                    // Check that the receiver channel is indeed empty
1209                    assert!(notifications.poll_next_unpin(cx).is_pending());
1210                    Poll::Ready(())
1211                })
1212                .await;
1213            }
1214            Poll::Pending | Poll::Ready(Err(_)) => {
1215                panic!("Notification should not be pending or fail");
1216            }
1217        }
1218
1219        // Ensure the notification ID was still incremented
1220        assert_eq!(exex_handle.next_notification_id, 23);
1221    }
1222
1223    #[tokio::test]
1224    async fn test_sends_chain_reorged_notification() {
1225        let provider_factory = create_test_provider_factory();
1226        init_genesis(&provider_factory).unwrap();
1227        let provider = BlockchainProvider::new(provider_factory).unwrap();
1228
1229        let temp_dir = tempfile::tempdir().unwrap();
1230        let wal = Wal::new(temp_dir.path()).unwrap();
1231
1232        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1233            "test_exex".to_string(),
1234            Default::default(),
1235            provider,
1236            EthEvmConfig::mainnet(),
1237            wal.handle(),
1238        );
1239
1240        let notification = ExExNotification::ChainReorged {
1241            old: Arc::new(Chain::default()),
1242            new: Arc::new(Chain::default()),
1243        };
1244
1245        // Even if the finished height is higher than the tip of the new chain, the reorg
1246        // notification should be received
1247        exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1248
1249        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1250
1251        // Send the notification
1252        match exex_handle.send(&mut cx, &(22, notification.clone())) {
1253            Poll::Ready(Ok(())) => {
1254                let received_notification = notifications.next().await.unwrap().unwrap();
1255                assert_eq!(received_notification, notification);
1256            }
1257            Poll::Pending | Poll::Ready(Err(_)) => {
1258                panic!("Notification should not be pending or fail")
1259            }
1260        }
1261
1262        // Ensure the notification ID was incremented
1263        assert_eq!(exex_handle.next_notification_id, 23);
1264    }
1265
1266    #[tokio::test]
1267    async fn test_sends_chain_reverted_notification() {
1268        let provider_factory = create_test_provider_factory();
1269        init_genesis(&provider_factory).unwrap();
1270        let provider = BlockchainProvider::new(provider_factory).unwrap();
1271
1272        let temp_dir = tempfile::tempdir().unwrap();
1273        let wal = Wal::new(temp_dir.path()).unwrap();
1274
1275        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1276            "test_exex".to_string(),
1277            Default::default(),
1278            provider,
1279            EthEvmConfig::mainnet(),
1280            wal.handle(),
1281        );
1282
1283        let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };
1284
1285        // Even if the finished height is higher than the tip of the new chain, the reorg
1286        // notification should be received
1287        exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1288
1289        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1290
1291        // Send the notification
1292        match exex_handle.send(&mut cx, &(22, notification.clone())) {
1293            Poll::Ready(Ok(())) => {
1294                let received_notification = notifications.next().await.unwrap().unwrap();
1295                assert_eq!(received_notification, notification);
1296            }
1297            Poll::Pending | Poll::Ready(Err(_)) => {
1298                panic!("Notification should not be pending or fail")
1299            }
1300        }
1301
1302        // Ensure the notification ID was incremented
1303        assert_eq!(exex_handle.next_notification_id, 23);
1304    }
1305
1306    #[tokio::test]
1307    async fn test_exex_wal() -> eyre::Result<()> {
1308        reth_tracing::init_test_tracing();
1309
1310        let mut rng = generators::rng();
1311
1312        let provider_factory = create_test_provider_factory();
1313        let genesis_hash = init_genesis(&provider_factory).unwrap();
1314        let genesis_block = provider_factory
1315            .sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash)
1316            .unwrap()
1317            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
1318
1319        let block = random_block(
1320            &mut rng,
1321            genesis_block.number + 1,
1322            BlockParams { parent: Some(genesis_hash), ..Default::default() },
1323        )
1324        .try_recover()
1325        .unwrap();
1326        let provider_rw = provider_factory.database_provider_rw().unwrap();
1327        provider_rw.insert_block(&block).unwrap();
1328        provider_rw.commit().unwrap();
1329
1330        let provider = BlockchainProvider::new(provider_factory).unwrap();
1331
1332        let temp_dir = tempfile::tempdir().unwrap();
1333        let wal = Wal::new(temp_dir.path()).unwrap();
1334
1335        let (exex_handle, events_tx, mut notifications) = ExExHandle::new(
1336            "test_exex".to_string(),
1337            Default::default(),
1338            provider.clone(),
1339            EthEvmConfig::mainnet(),
1340            wal.handle(),
1341        );
1342
1343        let genesis_notification = ExExNotification::ChainCommitted {
1344            new: Arc::new(Chain::new(
1345                vec![genesis_block.clone()],
1346                Default::default(),
1347                Default::default(),
1348            )),
1349        };
1350        let notification = ExExNotification::ChainCommitted {
1351            new: Arc::new(Chain::new(vec![block.clone()], Default::default(), Default::default())),
1352        };
1353
1354        let (finalized_headers_tx, rx) = watch::channel(None);
1355        finalized_headers_tx.send(Some(genesis_block.clone_sealed_header()))?;
1356        let finalized_header_stream = ForkChoiceStream::new(rx);
1357
1358        let mut exex_manager = std::pin::pin!(ExExManager::new(
1359            provider,
1360            vec![exex_handle],
1361            2,
1362            wal,
1363            finalized_header_stream
1364        ));
1365
1366        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1367
1368        exex_manager
1369            .handle()
1370            .send(ExExNotificationSource::Pipeline, genesis_notification.clone())?;
1371        exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?;
1372
1373        assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1374        assert_eq!(
1375            notifications.try_poll_next_unpin(&mut cx)?,
1376            Poll::Ready(Some(genesis_notification))
1377        );
1378        assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1379        assert_eq!(
1380            notifications.try_poll_next_unpin(&mut cx)?,
1381            Poll::Ready(Some(notification.clone()))
1382        );
1383        // WAL shouldn't contain the genesis notification, because it's finalized
1384        assert_eq!(
1385            exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1386            std::slice::from_ref(&notification)
1387        );
1388
1389        finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1390        assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1391        // WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event
1392        assert_eq!(
1393            exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1394            std::slice::from_ref(&notification)
1395        );
1396
1397        // Send a `FinishedHeight` event with a non-canonical block
1398        events_tx
1399            .send(ExExEvent::FinishedHeight((rng.random::<u64>(), rng.random::<B256>()).into()))
1400            .unwrap();
1401
1402        finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1403        assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1404        // WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a
1405        // non-canonical block
1406        assert_eq!(
1407            exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1408            std::slice::from_ref(&notification)
1409        );
1410
1411        // Send a `FinishedHeight` event with a canonical block
1412        events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
1413
1414        finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1415        assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1416        // WAL is finalized
1417        assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None);
1418
1419        Ok(())
1420    }
1421
1422    #[tokio::test]
1423    async fn test_deadlock_manager_wakes_after_buffer_clears() {
1424        // This test simulates the scenario where the buffer fills up, ingestion pauses,
1425        // and then space clears. We verify the manager wakes up to process pending items.
1426
1427        let temp_dir = tempfile::tempdir().unwrap();
1428        let wal = Wal::new(temp_dir.path()).unwrap();
1429        let provider_factory = create_test_provider_factory();
1430        init_genesis(&provider_factory).unwrap();
1431        let provider = BlockchainProvider::new(provider_factory.clone()).unwrap();
1432
1433        // 1. Setup Manager with Capacity = 1
1434        let (exex_handle, _, mut notifications) = ExExHandle::new(
1435            "test_exex".to_string(),
1436            Default::default(),
1437            provider,
1438            EthEvmConfig::mainnet(),
1439            wal.handle(),
1440        );
1441
1442        let max_capacity = 2;
1443        let exex_manager = ExExManager::new(
1444            provider_factory,
1445            vec![exex_handle],
1446            max_capacity,
1447            wal,
1448            empty_finalized_header_stream(),
1449        );
1450
1451        let manager_handle = exex_manager.handle();
1452
1453        // Spawn manager in background so it runs continuously
1454        tokio::spawn(async move {
1455            exex_manager.await.ok();
1456        });
1457
1458        // Helper to create notifications
1459        let mut rng = generators::rng();
1460        let mut make_notif = |id: u64| {
1461            let block = random_block(&mut rng, id, BlockParams::default()).try_recover().unwrap();
1462            ExExNotification::ChainCommitted {
1463                new: Arc::new(Chain::new(vec![block], Default::default(), Default::default())),
1464            }
1465        };
1466
1467        manager_handle.send(ExExNotificationSource::Pipeline, make_notif(1)).unwrap();
1468
1469        // Send the "Stuck" Item (Notification #100).
1470        // At this point, the Manager loop has skipped the ingestion logic because buffer is full
1471        // (buffer_full=true). This item sits in the unbounded 'handle_rx' channel waiting.
1472        manager_handle.send(ExExNotificationSource::Pipeline, make_notif(100)).unwrap();
1473
1474        // 3. Relieve Pressure
1475        // We consume items from the ExEx.
1476        // As we pull items out, the ExEx frees space -> Manager sends buffered item -> Manager
1477        // frees space. Once Manager frees space, the FIX (wake_by_ref) should trigger,
1478        // causing it to read Notif #100.
1479
1480        // Consume the jam
1481        let _ = notifications.next().await.unwrap();
1482
1483        // 4. Assert No Deadlock
1484        // We expect Notification #100 next.
1485        // If the wake_by_ref fix is missing, this will Time Out because the manager is sleeping
1486        // despite having empty buffer.
1487        let result =
1488            tokio::time::timeout(std::time::Duration::from_secs(1), notifications.next()).await;
1489
1490        assert!(
1491            result.is_ok(),
1492            "Deadlock detected! Manager failed to wake up and process Pending Item #100."
1493        );
1494    }
1495}