Skip to main content

reth_exex/
manager.rs

1use crate::{
2    wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
3};
4use alloy_consensus::BlockHeader;
5use alloy_eips::BlockNumHash;
6use futures::StreamExt;
7use itertools::Itertools;
8use metrics::Gauge;
9use reth_chain_state::ForkChoiceStream;
10use reth_ethereum_primitives::EthPrimitives;
11use reth_evm::ConfigureEvm;
12use reth_metrics::{metrics::Counter, Metrics};
13use reth_node_api::NodePrimitives;
14use reth_primitives_traits::SealedHeader;
15use reth_provider::HeaderProvider;
16use reth_tracing::tracing::{debug, warn};
17use std::{
18    collections::VecDeque,
19    fmt::Debug,
20    future::{poll_fn, Future},
21    ops::Not,
22    pin::Pin,
23    sync::{
24        atomic::{AtomicUsize, Ordering},
25        Arc,
26    },
27    task::{ready, Context, Poll},
28};
29use tokio::sync::{
30    mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender},
31    watch,
32};
33use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
34
35/// Default max size of the internal state notifications buffer.
36///
37/// 1024 notifications in the buffer is 3.5 hours of mainnet blocks,
38/// or 17 minutes of 1-second blocks.
39pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024;
40
41/// Default maximum number of blocks allowed in the WAL before emitting a warning.
42///
43/// This constant defines the default threshold for the Write-Ahead Log (WAL) size. If the number
44/// of blocks in the WAL exceeds this limit, a warning is logged to indicate potential issues.
45///
46/// This value is appropriate for Ethereum mainnet with ~12 second block times. For L2 chains with
47/// faster block times, this value should be increased proportionally to avoid excessive warnings.
48pub const DEFAULT_WAL_BLOCKS_WARNING: usize = 128;
49
50/// The source of the notification.
51///
52/// This distinguishment is needed to not commit any pipeline notificatations to [WAL](`Wal`),
53/// because they are already finalized.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum ExExNotificationSource {
56    /// The notification was sent from the pipeline.
57    Pipeline,
58    /// The notification was sent from the blockchain tree.
59    BlockchainTree,
60}
61
62/// Metrics for an `ExEx`.
63#[derive(Metrics)]
64#[metrics(scope = "exex")]
65struct ExExMetrics {
66    /// The total number of notifications sent to an `ExEx`.
67    notifications_sent_total: Counter,
68    /// The total number of events an `ExEx` has sent to the manager.
69    events_sent_total: Counter,
70}
71
72/// A handle to an `ExEx` used by the [`ExExManager`] to communicate with `ExEx`'s.
73///
74/// A handle should be created for each `ExEx` with a unique ID. The channels returned by
75/// [`ExExHandle::new`] should be given to the `ExEx`, while the handle itself should be given to
76/// the manager in [`ExExManager::new`].
77#[derive(Debug)]
78pub struct ExExHandle<N: NodePrimitives = EthPrimitives> {
79    /// The execution extension's ID.
80    id: String,
81    /// Metrics for an `ExEx`.
82    metrics: ExExMetrics,
83    /// Channel to send [`ExExNotification`]s to the `ExEx`.
84    sender: PollSender<ExExNotification<N>>,
85    /// Channel to receive [`ExExEvent`]s from the `ExEx`.
86    receiver: UnboundedReceiver<ExExEvent>,
87    /// The ID of the next notification to send to this `ExEx`.
88    next_notification_id: usize,
89    /// The finished block of the `ExEx`.
90    ///
91    /// If this is `None`, the `ExEx` has not emitted a `FinishedHeight` event.
92    finished_height: Option<BlockNumHash>,
93}
94
95impl<N: NodePrimitives> ExExHandle<N> {
96    /// Create a new handle for the given `ExEx`.
97    ///
98    /// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a
99    /// [`mpsc::Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`.
100    pub fn new<P, E: ConfigureEvm<Primitives = N>>(
101        id: String,
102        node_head: BlockNumHash,
103        provider: P,
104        evm_config: E,
105        wal_handle: WalHandle<N>,
106    ) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<P, E>) {
107        let (notification_tx, notification_rx) = mpsc::channel(1);
108        let (event_tx, event_rx) = mpsc::unbounded_channel();
109        let notifications =
110            ExExNotifications::new(node_head, provider, evm_config, notification_rx, wal_handle);
111
112        (
113            Self {
114                id: id.clone(),
115                metrics: ExExMetrics::new_with_labels(&[("exex", id)]),
116                sender: PollSender::new(notification_tx),
117                receiver: event_rx,
118                next_notification_id: 0,
119                finished_height: None,
120            },
121            event_tx,
122            notifications,
123        )
124    }
125
126    /// Reserves a slot in the `PollSender` channel and sends the notification if the slot was
127    /// successfully reserved.
128    ///
129    /// When the notification is sent, it is considered delivered.
130    fn send(
131        &mut self,
132        cx: &mut Context<'_>,
133        (notification_id, notification): &(usize, ExExNotification<N>),
134    ) -> Poll<Result<(), PollSendError<ExExNotification<N>>>> {
135        if let Some(finished_height) = self.finished_height {
136            match notification {
137                ExExNotification::ChainCommitted { new } => {
138                    // Skip the chain commit notification if the finished height of the ExEx is
139                    // higher than or equal to the tip of the new notification.
140                    // I.e., the ExEx has already processed the notification.
141                    if finished_height.number >= new.tip().number() {
142                        debug!(
143                            target: "exex::manager",
144                            exex_id = %self.id,
145                            %notification_id,
146                            ?finished_height,
147                            new_tip = %new.tip().number(),
148                            "Skipping notification"
149                        );
150
151                        self.next_notification_id = notification_id + 1;
152                        return Poll::Ready(Ok(()))
153                    }
154                }
155                // Do not handle [ExExNotification::ChainReorged] and
156                // [ExExNotification::ChainReverted] cases and always send the
157                // notification, because the ExEx should be aware of the reorgs and reverts lower
158                // than its finished height
159                ExExNotification::ChainReorged { .. } | ExExNotification::ChainReverted { .. } => {}
160            }
161        }
162
163        debug!(
164            target: "exex::manager",
165            exex_id = %self.id,
166            %notification_id,
167            "Reserving slot for notification"
168        );
169        match self.sender.poll_reserve(cx) {
170            Poll::Ready(Ok(())) => (),
171            other => return other,
172        }
173
174        debug!(
175            target: "exex::manager",
176            exex_id = %self.id,
177            %notification_id,
178            "Sending notification"
179        );
180        match self.sender.send_item(notification.clone()) {
181            Ok(()) => {
182                self.next_notification_id = notification_id + 1;
183                self.metrics.notifications_sent_total.increment(1);
184                Poll::Ready(Ok(()))
185            }
186            Err(err) => Poll::Ready(Err(err)),
187        }
188    }
189}
190
191/// Metrics for the `ExEx` manager.
192#[derive(Metrics)]
193#[metrics(scope = "exex.manager")]
194pub struct ExExManagerMetrics {
195    /// Max size of the internal state notifications buffer.
196    max_capacity: Gauge,
197    /// Current capacity of the internal state notifications buffer.
198    current_capacity: Gauge,
199    /// Current size of the internal state notifications buffer.
200    ///
201    /// Note that this might be slightly bigger than the maximum capacity in some cases.
202    buffer_size: Gauge,
203    /// Current number of `ExEx`'s on the node.
204    num_exexs: Gauge,
205}
206
207/// The execution extension manager.
208///
209/// The manager is responsible for:
210///
211/// - Receiving relevant events from the rest of the node, and sending these to the execution
212///   extensions
213/// - Backpressure
214/// - Error handling
215/// - Monitoring
216#[derive(Debug)]
217pub struct ExExManager<P, N: NodePrimitives> {
218    /// Provider for querying headers.
219    provider: P,
220
221    /// Handles to communicate with the `ExEx`'s.
222    exex_handles: Vec<ExExHandle<N>>,
223
224    /// [`ExExNotification`] channel from the [`ExExManagerHandle`]s.
225    handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification<N>)>,
226
227    /// The minimum notification ID currently present in the buffer.
228    min_id: usize,
229    /// Monotonically increasing ID for [`ExExNotification`]s.
230    next_id: usize,
231    /// Internal buffer of [`ExExNotification`]s.
232    ///
233    /// The first element of the tuple is a monotonically increasing ID unique to the notification
234    /// (the second element of the tuple).
235    buffer: VecDeque<(usize, ExExNotification<N>)>,
236    /// Max size of the internal state notifications buffer.
237    max_capacity: usize,
238    /// Current state notifications buffer capacity.
239    ///
240    /// Used to inform the execution stage of possible batch sizes.
241    current_capacity: Arc<AtomicUsize>,
242
243    /// Whether the manager is ready to receive new notifications.
244    is_ready: watch::Sender<bool>,
245
246    /// The finished height of all `ExEx`'s.
247    finished_height: watch::Sender<FinishedExExHeight>,
248
249    /// Write-Ahead Log for the [`ExExNotification`]s.
250    wal: Wal<N>,
251    /// A stream of finalized headers.
252    finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
253    /// The threshold for the number of blocks in the WAL before emitting a warning.
254    wal_blocks_warning: usize,
255
256    /// A handle to the `ExEx` manager.
257    handle: ExExManagerHandle<N>,
258    /// Metrics for the `ExEx` manager.
259    metrics: ExExManagerMetrics,
260}
261
262impl<P, N> ExExManager<P, N>
263where
264    N: NodePrimitives,
265{
266    /// Create a new [`ExExManager`].
267    ///
268    /// You must provide an [`ExExHandle`] for each `ExEx` and the maximum capacity of the
269    /// notification buffer in the manager.
270    ///
271    /// When the capacity is exceeded (which can happen if an `ExEx` is slow) no one can send
272    /// notifications over [`ExExManagerHandle`]s until there is capacity again.
273    pub fn new(
274        provider: P,
275        handles: Vec<ExExHandle<N>>,
276        max_capacity: usize,
277        wal: Wal<N>,
278        finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
279    ) -> Self {
280        let num_exexs = handles.len();
281
282        let (handle_tx, handle_rx) = mpsc::unbounded_channel();
283        let (is_ready_tx, is_ready_rx) = watch::channel(true);
284        let (finished_height_tx, finished_height_rx) = watch::channel(if num_exexs == 0 {
285            FinishedExExHeight::NoExExs
286        } else {
287            FinishedExExHeight::NotReady
288        });
289
290        let current_capacity = Arc::new(AtomicUsize::new(max_capacity));
291
292        let metrics = ExExManagerMetrics::default();
293        metrics.max_capacity.set(max_capacity as f64);
294        metrics.num_exexs.set(num_exexs as f64);
295
296        Self {
297            provider,
298
299            exex_handles: handles,
300
301            handle_rx,
302
303            min_id: 0,
304            next_id: 0,
305            buffer: VecDeque::with_capacity(max_capacity),
306            max_capacity,
307            current_capacity: Arc::clone(&current_capacity),
308
309            is_ready: is_ready_tx,
310            finished_height: finished_height_tx,
311
312            wal,
313            finalized_header_stream,
314            wal_blocks_warning: DEFAULT_WAL_BLOCKS_WARNING,
315
316            handle: ExExManagerHandle {
317                exex_tx: handle_tx,
318                num_exexs,
319                is_ready_receiver: is_ready_rx.clone(),
320                is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
321                current_capacity,
322                finished_height: finished_height_rx,
323            },
324            metrics,
325        }
326    }
327
328    /// Returns the handle to the manager.
329    pub fn handle(&self) -> ExExManagerHandle<N> {
330        self.handle.clone()
331    }
332
333    /// Sets the threshold for the number of blocks in the WAL before emitting a warning.
334    ///
335    /// For L2 chains with faster block times, this value should be increased proportionally
336    /// to avoid excessive warnings. For example, a chain with 2-second block times might use
337    /// a value 6x higher than the default.
338    pub const fn with_wal_blocks_warning(mut self, threshold: usize) -> Self {
339        self.wal_blocks_warning = threshold;
340        self
341    }
342
343    /// Updates the current buffer capacity and notifies all `is_ready` watchers of the manager's
344    /// readiness to receive notifications.
345    fn update_capacity(&self) {
346        let capacity = self.max_capacity.saturating_sub(self.buffer.len());
347        self.current_capacity.store(capacity, Ordering::Relaxed);
348        self.metrics.current_capacity.set(capacity as f64);
349        self.metrics.buffer_size.set(self.buffer.len() as f64);
350
351        // we can safely ignore if the channel is closed, since the manager always holds it open
352        // internally
353        let _ = self.is_ready.send(capacity > 0);
354    }
355
356    /// Pushes a new notification into the managers internal buffer, assigning the notification a
357    /// unique ID.
358    fn push_notification(&mut self, notification: ExExNotification<N>) {
359        let next_id = self.next_id;
360        self.buffer.push_back((next_id, notification));
361        self.next_id += 1;
362    }
363}
364
365impl<P, N> ExExManager<P, N>
366where
367    P: HeaderProvider,
368    N: NodePrimitives,
369{
370    /// Finalizes the WAL according to the passed finalized header.
371    ///
372    /// This function checks if all ExExes are on the canonical chain and finalizes the WAL if
373    /// necessary.
374    fn finalize_wal(&self, finalized_header: SealedHeader<N::BlockHeader>) -> eyre::Result<()> {
375        debug!(target: "exex::manager", header = ?finalized_header.num_hash(), "Received finalized header");
376
377        // Check if all ExExes are on the canonical chain
378        let exex_finished_heights = self
379            .exex_handles
380            .iter()
381            // Get ID and finished height for each ExEx
382            .map(|exex_handle| (&exex_handle.id, exex_handle.finished_height))
383            // Deduplicate all hashes
384            .unique_by(|(_, num_hash)| num_hash.map(|num_hash| num_hash.hash))
385            // Check if hashes are canonical
386            .map(|(exex_id, num_hash)| {
387                num_hash.map_or(Ok((exex_id, num_hash, false)), |num_hash| {
388                    self.provider
389                        .is_known(num_hash.hash)
390                        // Save the ExEx ID, finished height, and whether the hash is canonical
391                        .map(|is_canonical| (exex_id, Some(num_hash), is_canonical))
392                })
393            })
394            // We collect here to be able to log the unfinalized ExExes below
395            .collect::<Result<Vec<_>, _>>()?;
396        if exex_finished_heights.iter().all(|(_, _, is_canonical)| *is_canonical) {
397            // If there is a finalized header and all ExExs are on the canonical chain, finalize
398            // the WAL with either the lowest finished height among all ExExes, or finalized header
399            // – whichever is lower.
400            let lowest_finished_height = exex_finished_heights
401                .iter()
402                .copied()
403                .filter_map(|(_, num_hash, _)| num_hash)
404                .chain([(finalized_header.num_hash())])
405                .min_by_key(|num_hash| num_hash.number)
406                .unwrap();
407
408            self.wal.finalize(lowest_finished_height)?;
409            if self.wal.num_blocks() > self.wal_blocks_warning {
410                warn!(
411                    target: "exex::manager",
412                    blocks = ?self.wal.num_blocks(),
413                    threshold = self.wal_blocks_warning,
414                    "WAL contains too many blocks and is not getting cleared. That will lead to increased disk space usage. Check that you emit the FinishedHeight event from your ExExes."
415                );
416            }
417        } else {
418            let unfinalized_exexes = exex_finished_heights
419                .into_iter()
420                .filter_map(|(exex_id, num_hash, is_canonical)| {
421                    is_canonical.not().then_some((exex_id, num_hash))
422                })
423                .format_with(", ", |(exex_id, num_hash), f| {
424                    f(&format_args!("{exex_id} = {num_hash:?}"))
425                })
426                // We need this because `debug!` uses the argument twice when formatting the final
427                // log message, but the result of `format_with` can only be used once
428                .to_string();
429            debug!(
430                target: "exex::manager",
431                %unfinalized_exexes,
432                "Not all ExExes are on the canonical chain, can't finalize the WAL"
433            );
434        }
435
436        Ok(())
437    }
438}
439
440impl<P, N> Future for ExExManager<P, N>
441where
442    P: HeaderProvider + Unpin + 'static,
443    N: NodePrimitives,
444{
445    type Output = eyre::Result<()>;
446
447    /// Main loop of the [`ExExManager`]. The order of operations is as follows:
448    /// 1. Handle incoming ExEx events. We do it before finalizing the WAL, because it depends on
449    ///    the latest state of [`ExExEvent::FinishedHeight`] events.
450    /// 2. Finalize the WAL with the finalized header, if necessary.
451    /// 3. Drain [`ExExManagerHandle`] notifications, push them to the internal buffer and update
452    ///    the internal buffer capacity.
453    /// 5. Send notifications from the internal buffer to those ExExes that are ready to receive new
454    ///    notifications.
455    /// 5. Remove notifications from the internal buffer that have been sent to **all** ExExes and
456    ///    update the internal buffer capacity.
457    /// 6. Update the channel with the lowest [`FinishedExExHeight`] among all ExExes.
458    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
459        let this = self.get_mut();
460
461        // Handle incoming ExEx events
462        for exex in &mut this.exex_handles {
463            while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
464                debug!(target: "exex::manager", exex_id = %exex.id, ?event, "Received event from ExEx");
465                exex.metrics.events_sent_total.increment(1);
466                match event {
467                    ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),
468                }
469            }
470        }
471
472        // Drain the finalized header stream and finalize the WAL with the last header
473        let mut last_finalized_header = None;
474        while let Poll::Ready(finalized_header) = this.finalized_header_stream.poll_next_unpin(cx) {
475            last_finalized_header = finalized_header;
476        }
477        if let Some(header) = last_finalized_header {
478            this.finalize_wal(header)?;
479        }
480
481        // Drain handle notifications
482        while this.buffer.len() < this.max_capacity {
483            if let Poll::Ready(Some((source, notification))) = this.handle_rx.poll_recv(cx) {
484                let committed_tip =
485                    notification.committed_chain().map(|chain| chain.tip().number());
486                let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number());
487                debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Received new notification");
488
489                // Commit to WAL only notifications from blockchain tree. Pipeline notifications
490                // always contain only finalized blocks.
491                match source {
492                    ExExNotificationSource::BlockchainTree => {
493                        debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Committing notification to WAL");
494                        this.wal.commit(&notification)?;
495                    }
496                    ExExNotificationSource::Pipeline => {
497                        debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Notification was sent from pipeline, skipping WAL commit");
498                    }
499                }
500
501                this.push_notification(notification);
502                continue
503            }
504            break
505        }
506        let buffer_full = this.buffer.len() >= this.max_capacity;
507
508        // Update capacity
509        this.update_capacity();
510
511        // Advance all poll senders
512        let mut min_id = usize::MAX;
513        for idx in (0..this.exex_handles.len()).rev() {
514            let mut exex = this.exex_handles.swap_remove(idx);
515
516            // It is a logic error for this to ever underflow since the manager manages the
517            // notification IDs
518            let notification_index = exex
519                .next_notification_id
520                .checked_sub(this.min_id)
521                .expect("exex expected notification ID outside the manager's range");
522            if let Some(notification) = this.buffer.get(notification_index) &&
523                let Poll::Ready(Err(err)) = exex.send(cx, notification)
524            {
525                // The channel was closed, which is irrecoverable for the manager
526                return Poll::Ready(Err(err.into()))
527            }
528            min_id = min_id.min(exex.next_notification_id);
529            this.exex_handles.push(exex);
530        }
531
532        // Remove processed buffered notifications
533        debug!(target: "exex::manager", %min_id, "Updating lowest notification id in buffer");
534        this.buffer.retain(|&(id, _)| id >= min_id);
535        this.min_id = min_id;
536
537        // Update capacity
538        this.update_capacity();
539
540        // If the buffer was full and we made space, we need to wake up to accept new notifications
541        if buffer_full && this.buffer.len() < this.max_capacity {
542            debug!(target: "exex::manager", "Buffer has space again, waking up senders");
543            cx.waker().wake_by_ref();
544        }
545
546        // Update watch channel block number
547        let finished_height = this.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
548            exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr)))
549        });
550        if let Ok(finished_height) = finished_height {
551            let _ = this.finished_height.send(FinishedExExHeight::Height(finished_height));
552        }
553
554        Poll::Pending
555    }
556}
557
558/// A handle to communicate with the [`ExExManager`].
559#[derive(Debug)]
560pub struct ExExManagerHandle<N: NodePrimitives = EthPrimitives> {
561    /// Channel to send notifications to the `ExEx` manager.
562    exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification<N>)>,
563    /// The number of `ExEx`'s running on the node.
564    num_exexs: usize,
565    /// A watch channel denoting whether the manager is ready for new notifications or not.
566    ///
567    /// This is stored internally alongside a `ReusableBoxFuture` representation of the same value.
568    /// This field is only used to create a new `ReusableBoxFuture` when the handle is cloned,
569    /// but is otherwise unused.
570    is_ready_receiver: watch::Receiver<bool>,
571    /// A reusable future that resolves when the manager is ready for new
572    /// notifications.
573    is_ready: ReusableBoxFuture<'static, watch::Receiver<bool>>,
574    /// The current capacity of the manager's internal notification buffer.
575    current_capacity: Arc<AtomicUsize>,
576    /// The finished height of all `ExEx`'s.
577    finished_height: watch::Receiver<FinishedExExHeight>,
578}
579
580impl<N: NodePrimitives> ExExManagerHandle<N> {
581    /// Creates an empty manager handle.
582    ///
583    /// Use this if there is no manager present.
584    ///
585    /// The handle will always be ready, and have a capacity of 0.
586    pub fn empty() -> Self {
587        let (exex_tx, _) = mpsc::unbounded_channel();
588        let (_, is_ready_rx) = watch::channel(true);
589        let (_, finished_height_rx) = watch::channel(FinishedExExHeight::NoExExs);
590
591        Self {
592            exex_tx,
593            num_exexs: 0,
594            is_ready_receiver: is_ready_rx.clone(),
595            is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
596            current_capacity: Arc::new(AtomicUsize::new(0)),
597            finished_height: finished_height_rx,
598        }
599    }
600
601    /// Synchronously send a notification over the channel to all execution extensions.
602    ///
603    /// Senders should call [`Self::has_capacity`] first.
604    pub fn send(
605        &self,
606        source: ExExNotificationSource,
607        notification: ExExNotification<N>,
608    ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
609        self.exex_tx.send((source, notification))
610    }
611
612    /// Asynchronously send a notification over the channel to all execution extensions.
613    ///
614    /// The returned future resolves when the notification has been delivered. If there is no
615    /// capacity in the channel, the future will wait.
616    pub async fn send_async(
617        &mut self,
618        source: ExExNotificationSource,
619        notification: ExExNotification<N>,
620    ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
621        self.ready().await;
622        self.exex_tx.send((source, notification))
623    }
624
625    /// Get the current capacity of the `ExEx` manager's internal notification buffer.
626    pub fn capacity(&self) -> usize {
627        self.current_capacity.load(Ordering::Relaxed)
628    }
629
630    /// Whether there is capacity in the `ExEx` manager's internal notification buffer.
631    ///
632    /// If this returns `false`, the owner of the handle should **NOT** send new notifications over
633    /// the channel until the manager is ready again, as this can lead to unbounded memory growth.
634    pub fn has_capacity(&self) -> bool {
635        self.capacity() > 0
636    }
637
638    /// Returns `true` if there are `ExEx`'s installed in the node.
639    pub const fn has_exexs(&self) -> bool {
640        self.num_exexs > 0
641    }
642
643    /// The finished height of all `ExEx`'s.
644    pub fn finished_height(&self) -> watch::Receiver<FinishedExExHeight> {
645        self.finished_height.clone()
646    }
647
648    /// Wait until the manager is ready for new notifications.
649    pub async fn ready(&mut self) {
650        poll_fn(|cx| self.poll_ready(cx)).await
651    }
652
653    /// Wait until the manager is ready for new notifications.
654    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
655        let rx = ready!(self.is_ready.poll(cx));
656        self.is_ready.set(make_wait_future(rx));
657        Poll::Ready(())
658    }
659}
660
661/// Creates a future that resolves once the given watch channel receiver is true.
662async fn make_wait_future(mut rx: watch::Receiver<bool>) -> watch::Receiver<bool> {
663    // NOTE(onbjerg): We can ignore the error here, because if the channel is closed, the node
664    // is shutting down.
665    let _ = rx.wait_for(|ready| *ready).await;
666    rx
667}
668
669impl<N: NodePrimitives> Clone for ExExManagerHandle<N> {
670    fn clone(&self) -> Self {
671        Self {
672            exex_tx: self.exex_tx.clone(),
673            num_exexs: self.num_exexs,
674            is_ready_receiver: self.is_ready_receiver.clone(),
675            is_ready: ReusableBoxFuture::new(make_wait_future(self.is_ready_receiver.clone())),
676            current_capacity: self.current_capacity.clone(),
677            finished_height: self.finished_height.clone(),
678        }
679    }
680}
681
682#[cfg(test)]
683mod tests {
684    use super::*;
685    use crate::wal::WalResult;
686    use alloy_primitives::B256;
687    use futures::{StreamExt, TryStreamExt};
688    use rand::Rng;
689    use reth_db_common::init::init_genesis;
690    use reth_evm_ethereum::EthEvmConfig;
691    use reth_primitives_traits::RecoveredBlock;
692    use reth_provider::{
693        providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockReader,
694        BlockWriter, Chain, DBProvider, DatabaseProviderFactory, TransactionVariant,
695    };
696    use reth_testing_utils::generators::{self, random_block, BlockParams};
697
698    fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
699        let (tx, rx) = watch::channel(None);
700        // Do not drop the sender, otherwise the receiver will always return an error
701        std::mem::forget(tx);
702        ForkChoiceStream::new(rx)
703    }
704
705    #[tokio::test]
706    async fn test_delivers_events() {
707        let temp_dir = tempfile::tempdir().unwrap();
708        let wal = Wal::new(temp_dir.path()).unwrap();
709
710        let (mut exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
711            "test_exex".to_string(),
712            Default::default(),
713            (),
714            EthEvmConfig::mainnet(),
715            wal.handle(),
716        );
717
718        // Send an event and check that it's delivered correctly
719        let event = ExExEvent::FinishedHeight(BlockNumHash::new(42, B256::random()));
720        event_tx.send(event).unwrap();
721        let received_event = exex_handle.receiver.recv().await.unwrap();
722        assert_eq!(received_event, event);
723    }
724
725    #[tokio::test]
726    async fn test_has_exexs() {
727        let temp_dir = tempfile::tempdir().unwrap();
728        let wal = Wal::new(temp_dir.path()).unwrap();
729
730        let (exex_handle_1, _, _) = ExExHandle::new(
731            "test_exex_1".to_string(),
732            Default::default(),
733            (),
734            EthEvmConfig::mainnet(),
735            wal.handle(),
736        );
737
738        assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
739            .handle
740            .has_exexs());
741
742        assert!(ExExManager::new((), vec![exex_handle_1], 0, wal, empty_finalized_header_stream())
743            .handle
744            .has_exexs());
745    }
746
747    #[tokio::test]
748    async fn test_has_capacity() {
749        let temp_dir = tempfile::tempdir().unwrap();
750        let wal = Wal::new(temp_dir.path()).unwrap();
751
752        let (exex_handle_1, _, _) = ExExHandle::new(
753            "test_exex_1".to_string(),
754            Default::default(),
755            (),
756            EthEvmConfig::mainnet(),
757            wal.handle(),
758        );
759
760        assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
761            .handle
762            .has_capacity());
763
764        assert!(ExExManager::new(
765            (),
766            vec![exex_handle_1],
767            10,
768            wal,
769            empty_finalized_header_stream()
770        )
771        .handle
772        .has_capacity());
773    }
774
775    #[test]
776    fn test_push_notification() {
777        let temp_dir = tempfile::tempdir().unwrap();
778        let wal = Wal::new(temp_dir.path()).unwrap();
779
780        let (exex_handle, _, _) = ExExHandle::new(
781            "test_exex".to_string(),
782            Default::default(),
783            (),
784            EthEvmConfig::mainnet(),
785            wal.handle(),
786        );
787
788        // Create a mock ExExManager and add the exex_handle to it
789        let mut exex_manager =
790            ExExManager::new((), vec![exex_handle], 10, wal, empty_finalized_header_stream());
791
792        // Define the notification for testing
793        let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
794        block1.set_hash(B256::new([0x01; 32]));
795        block1.set_block_number(10);
796
797        let notification1 = ExExNotification::ChainCommitted {
798            new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
799        };
800
801        // Push the first notification
802        exex_manager.push_notification(notification1.clone());
803
804        // Verify the buffer contains the notification with the correct ID
805        assert_eq!(exex_manager.buffer.len(), 1);
806        assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
807        assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
808        assert_eq!(exex_manager.next_id, 1);
809
810        // Push another notification
811        let mut block2: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
812        block2.set_hash(B256::new([0x02; 32]));
813        block2.set_block_number(20);
814
815        let notification2 = ExExNotification::ChainCommitted {
816            new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
817        };
818
819        exex_manager.push_notification(notification2.clone());
820
821        // Verify the buffer contains both notifications with correct IDs
822        assert_eq!(exex_manager.buffer.len(), 2);
823        assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
824        assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
825        assert_eq!(exex_manager.buffer.get(1).unwrap().0, 1);
826        assert_eq!(exex_manager.buffer.get(1).unwrap().1, notification2);
827        assert_eq!(exex_manager.next_id, 2);
828    }
829
830    #[test]
831    fn test_update_capacity() {
832        let temp_dir = tempfile::tempdir().unwrap();
833        let wal = Wal::new(temp_dir.path()).unwrap();
834
835        let (exex_handle, _, _) = ExExHandle::new(
836            "test_exex".to_string(),
837            Default::default(),
838            (),
839            EthEvmConfig::mainnet(),
840            wal.handle(),
841        );
842
843        // Create a mock ExExManager and add the exex_handle to it
844        let max_capacity = 5;
845        let mut exex_manager = ExExManager::new(
846            (),
847            vec![exex_handle],
848            max_capacity,
849            wal,
850            empty_finalized_header_stream(),
851        );
852
853        // Push some notifications to fill part of the buffer
854        let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
855        block1.set_hash(B256::new([0x01; 32]));
856        block1.set_block_number(10);
857
858        let notification1 = ExExNotification::ChainCommitted {
859            new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
860        };
861
862        exex_manager.push_notification(notification1.clone());
863        exex_manager.push_notification(notification1);
864
865        // Update capacity
866        exex_manager.update_capacity();
867
868        // Verify current capacity and metrics
869        assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity - 2);
870
871        // Clear the buffer and update capacity
872        exex_manager.buffer.clear();
873        exex_manager.update_capacity();
874
875        // Verify current capacity
876        assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity);
877    }
878
879    #[tokio::test]
880    async fn test_updates_block_height() {
881        let temp_dir = tempfile::tempdir().unwrap();
882        let wal = Wal::new(temp_dir.path()).unwrap();
883
884        let provider_factory = create_test_provider_factory();
885
886        let (exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
887            "test_exex".to_string(),
888            Default::default(),
889            (),
890            EthEvmConfig::mainnet(),
891            wal.handle(),
892        );
893
894        // Check initial block height
895        assert!(exex_handle.finished_height.is_none());
896
897        // Update the block height via an event
898        let block = BlockNumHash::new(42, B256::random());
899        event_tx.send(ExExEvent::FinishedHeight(block)).unwrap();
900
901        // Create a mock ExExManager and add the exex_handle to it
902        let exex_manager = ExExManager::new(
903            provider_factory,
904            vec![exex_handle],
905            10,
906            Wal::new(temp_dir.path()).unwrap(),
907            empty_finalized_header_stream(),
908        );
909
910        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
911
912        // Pin the ExExManager to call the poll method
913        let mut pinned_manager = std::pin::pin!(exex_manager);
914        let _ = pinned_manager.as_mut().poll(&mut cx);
915
916        // Check that the block height was updated
917        let updated_exex_handle = &pinned_manager.exex_handles[0];
918        assert_eq!(updated_exex_handle.finished_height, Some(block));
919
920        // Get the receiver for the finished height
921        let mut receiver = pinned_manager.handle.finished_height();
922
923        // Wait for a new value to be sent
924        receiver.changed().await.unwrap();
925
926        // Get the latest value
927        let finished_height = *receiver.borrow();
928
929        // The finished height should be updated to the lower block height
930        assert_eq!(finished_height, FinishedExExHeight::Height(42));
931    }
932
933    #[tokio::test]
934    async fn test_updates_block_height_lower() {
935        let temp_dir = tempfile::tempdir().unwrap();
936        let wal = Wal::new(temp_dir.path()).unwrap();
937
938        let provider_factory = create_test_provider_factory();
939
940        // Create two `ExExHandle` instances
941        let (exex_handle1, event_tx1, _) = ExExHandle::new(
942            "test_exex1".to_string(),
943            Default::default(),
944            (),
945            EthEvmConfig::mainnet(),
946            wal.handle(),
947        );
948        let (exex_handle2, event_tx2, _) = ExExHandle::new(
949            "test_exex2".to_string(),
950            Default::default(),
951            (),
952            EthEvmConfig::mainnet(),
953            wal.handle(),
954        );
955
956        let block1 = BlockNumHash::new(42, B256::random());
957        let block2 = BlockNumHash::new(10, B256::random());
958
959        // Send events to update the block heights of the two handles, with the second being lower
960        event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
961        event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
962
963        let exex_manager = ExExManager::new(
964            provider_factory,
965            vec![exex_handle1, exex_handle2],
966            10,
967            Wal::new(temp_dir.path()).unwrap(),
968            empty_finalized_header_stream(),
969        );
970
971        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
972
973        let mut pinned_manager = std::pin::pin!(exex_manager);
974
975        let _ = pinned_manager.as_mut().poll(&mut cx);
976
977        // Get the receiver for the finished height
978        let mut receiver = pinned_manager.handle.finished_height();
979
980        // Wait for a new value to be sent
981        receiver.changed().await.unwrap();
982
983        // Get the latest value
984        let finished_height = *receiver.borrow();
985
986        // The finished height should be updated to the lower block height
987        assert_eq!(finished_height, FinishedExExHeight::Height(10));
988    }
989
990    #[tokio::test]
991    async fn test_updates_block_height_greater() {
992        let temp_dir = tempfile::tempdir().unwrap();
993        let wal = Wal::new(temp_dir.path()).unwrap();
994
995        let provider_factory = create_test_provider_factory();
996
997        // Create two `ExExHandle` instances
998        let (exex_handle1, event_tx1, _) = ExExHandle::new(
999            "test_exex1".to_string(),
1000            Default::default(),
1001            (),
1002            EthEvmConfig::mainnet(),
1003            wal.handle(),
1004        );
1005        let (exex_handle2, event_tx2, _) = ExExHandle::new(
1006            "test_exex2".to_string(),
1007            Default::default(),
1008            (),
1009            EthEvmConfig::mainnet(),
1010            wal.handle(),
1011        );
1012
1013        // Assert that the initial block height is `None` for the first `ExExHandle`.
1014        assert!(exex_handle1.finished_height.is_none());
1015
1016        let block1 = BlockNumHash::new(42, B256::random());
1017        let block2 = BlockNumHash::new(100, B256::random());
1018
1019        // Send events to update the block heights of the two handles, with the second being higher.
1020        event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
1021        event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
1022
1023        let exex_manager = ExExManager::new(
1024            provider_factory,
1025            vec![exex_handle1, exex_handle2],
1026            10,
1027            Wal::new(temp_dir.path()).unwrap(),
1028            empty_finalized_header_stream(),
1029        );
1030
1031        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1032
1033        let mut pinned_manager = std::pin::pin!(exex_manager);
1034
1035        let _ = pinned_manager.as_mut().poll(&mut cx);
1036
1037        // Get the receiver for the finished height
1038        let mut receiver = pinned_manager.handle.finished_height();
1039
1040        // Wait for a new value to be sent
1041        receiver.changed().await.unwrap();
1042
1043        // Get the latest value
1044        let finished_height = *receiver.borrow();
1045
1046        // The finished height should be updated to the lower block height
1047        assert_eq!(finished_height, FinishedExExHeight::Height(42));
1048
1049        // // The lower block height should be retained
1050        // let updated_exex_handle = &pinned_manager.exex_handles[0];
1051        // assert_eq!(updated_exex_handle.finished_height, Some(42));
1052    }
1053
1054    #[tokio::test]
1055    async fn test_exex_manager_capacity() {
1056        let temp_dir = tempfile::tempdir().unwrap();
1057        let wal = Wal::new(temp_dir.path()).unwrap();
1058
1059        let provider_factory = create_test_provider_factory();
1060
1061        let (exex_handle_1, _, _) = ExExHandle::new(
1062            "test_exex_1".to_string(),
1063            Default::default(),
1064            (),
1065            EthEvmConfig::mainnet(),
1066            wal.handle(),
1067        );
1068
1069        // Create an ExExManager with a small max capacity
1070        let max_capacity = 2;
1071        let exex_manager = ExExManager::new(
1072            provider_factory,
1073            vec![exex_handle_1],
1074            max_capacity,
1075            Wal::new(temp_dir.path()).unwrap(),
1076            empty_finalized_header_stream(),
1077        );
1078
1079        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1080
1081        // Setup a notification
1082        let notification = ExExNotification::ChainCommitted {
1083            new: Arc::new(Chain::new(
1084                vec![Default::default()],
1085                Default::default(),
1086                Default::default(),
1087            )),
1088        };
1089
1090        // Send notifications to go over the max capacity
1091        exex_manager
1092            .handle
1093            .exex_tx
1094            .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1095            .unwrap();
1096        exex_manager
1097            .handle
1098            .exex_tx
1099            .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1100            .unwrap();
1101        exex_manager
1102            .handle
1103            .exex_tx
1104            .send((ExExNotificationSource::BlockchainTree, notification))
1105            .unwrap();
1106
1107        // Pin the ExExManager to call the poll method
1108        let mut pinned_manager = std::pin::pin!(exex_manager);
1109
1110        // Before polling, the next notification ID should be 0 and the buffer should be empty
1111        assert_eq!(pinned_manager.next_id, 0);
1112        assert_eq!(pinned_manager.buffer.len(), 0);
1113
1114        let _ = pinned_manager.as_mut().poll(&mut cx);
1115
1116        // After polling, the next notification ID and buffer size should be updated
1117        assert_eq!(pinned_manager.next_id, 2);
1118        assert_eq!(pinned_manager.buffer.len(), 2);
1119    }
1120
1121    #[tokio::test]
1122    async fn exex_handle_new() {
1123        let provider_factory = create_test_provider_factory();
1124        init_genesis(&provider_factory).unwrap();
1125        let provider = BlockchainProvider::new(provider_factory).unwrap();
1126
1127        let temp_dir = tempfile::tempdir().unwrap();
1128        let wal = Wal::new(temp_dir.path()).unwrap();
1129
1130        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1131            "test_exex".to_string(),
1132            Default::default(),
1133            provider,
1134            EthEvmConfig::mainnet(),
1135            wal.handle(),
1136        );
1137
1138        // Check initial state
1139        assert_eq!(exex_handle.id, "test_exex");
1140        assert_eq!(exex_handle.next_notification_id, 0);
1141
1142        // Setup two blocks for the chain commit notification
1143        let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1144        block1.set_hash(B256::new([0x01; 32]));
1145        block1.set_block_number(10);
1146
1147        let mut block2: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1148        block2.set_hash(B256::new([0x02; 32]));
1149        block2.set_block_number(11);
1150
1151        // Setup a notification
1152        let notification = ExExNotification::ChainCommitted {
1153            new: Arc::new(Chain::new(
1154                vec![Default::default()],
1155                Default::default(),
1156                Default::default(),
1157            )),
1158        };
1159
1160        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1161
1162        // Send a notification and ensure it's received correctly
1163        match exex_handle.send(&mut cx, &(22, notification.clone())) {
1164            Poll::Ready(Ok(())) => {
1165                let received_notification = notifications.next().await.unwrap().unwrap();
1166                assert_eq!(received_notification, notification);
1167            }
1168            Poll::Pending => panic!("Notification send is pending"),
1169            Poll::Ready(Err(e)) => panic!("Failed to send notification: {e:?}"),
1170        }
1171
1172        // Ensure the notification ID was incremented
1173        assert_eq!(exex_handle.next_notification_id, 23);
1174    }
1175
1176    #[tokio::test]
1177    async fn test_notification_if_finished_height_gt_chain_tip() {
1178        let provider_factory = create_test_provider_factory();
1179        init_genesis(&provider_factory).unwrap();
1180        let provider = BlockchainProvider::new(provider_factory).unwrap();
1181
1182        let temp_dir = tempfile::tempdir().unwrap();
1183        let wal = Wal::new(temp_dir.path()).unwrap();
1184
1185        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1186            "test_exex".to_string(),
1187            Default::default(),
1188            provider,
1189            EthEvmConfig::mainnet(),
1190            wal.handle(),
1191        );
1192
1193        // Set finished_height to a value higher than the block tip
1194        exex_handle.finished_height = Some(BlockNumHash::new(15, B256::random()));
1195
1196        let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1197        block1.set_hash(B256::new([0x01; 32]));
1198        block1.set_block_number(10);
1199
1200        let notification = ExExNotification::ChainCommitted {
1201            new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
1202        };
1203
1204        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1205
1206        // Send the notification
1207        match exex_handle.send(&mut cx, &(22, notification)) {
1208            Poll::Ready(Ok(())) => {
1209                poll_fn(|cx| {
1210                    // The notification should be skipped, so nothing should be sent.
1211                    // Check that the receiver channel is indeed empty
1212                    assert!(notifications.poll_next_unpin(cx).is_pending());
1213                    Poll::Ready(())
1214                })
1215                .await;
1216            }
1217            Poll::Pending | Poll::Ready(Err(_)) => {
1218                panic!("Notification should not be pending or fail");
1219            }
1220        }
1221
1222        // Ensure the notification ID was still incremented
1223        assert_eq!(exex_handle.next_notification_id, 23);
1224    }
1225
1226    #[tokio::test]
1227    async fn test_sends_chain_reorged_notification() {
1228        let provider_factory = create_test_provider_factory();
1229        init_genesis(&provider_factory).unwrap();
1230        let provider = BlockchainProvider::new(provider_factory).unwrap();
1231
1232        let temp_dir = tempfile::tempdir().unwrap();
1233        let wal = Wal::new(temp_dir.path()).unwrap();
1234
1235        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1236            "test_exex".to_string(),
1237            Default::default(),
1238            provider,
1239            EthEvmConfig::mainnet(),
1240            wal.handle(),
1241        );
1242
1243        let notification = ExExNotification::ChainReorged {
1244            old: Arc::new(Chain::default()),
1245            new: Arc::new(Chain::default()),
1246        };
1247
1248        // Even if the finished height is higher than the tip of the new chain, the reorg
1249        // notification should be received
1250        exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1251
1252        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1253
1254        // Send the notification
1255        match exex_handle.send(&mut cx, &(22, notification.clone())) {
1256            Poll::Ready(Ok(())) => {
1257                let received_notification = notifications.next().await.unwrap().unwrap();
1258                assert_eq!(received_notification, notification);
1259            }
1260            Poll::Pending | Poll::Ready(Err(_)) => {
1261                panic!("Notification should not be pending or fail")
1262            }
1263        }
1264
1265        // Ensure the notification ID was incremented
1266        assert_eq!(exex_handle.next_notification_id, 23);
1267    }
1268
1269    #[tokio::test]
1270    async fn test_sends_chain_reverted_notification() {
1271        let provider_factory = create_test_provider_factory();
1272        init_genesis(&provider_factory).unwrap();
1273        let provider = BlockchainProvider::new(provider_factory).unwrap();
1274
1275        let temp_dir = tempfile::tempdir().unwrap();
1276        let wal = Wal::new(temp_dir.path()).unwrap();
1277
1278        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1279            "test_exex".to_string(),
1280            Default::default(),
1281            provider,
1282            EthEvmConfig::mainnet(),
1283            wal.handle(),
1284        );
1285
1286        let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };
1287
1288        // Even if the finished height is higher than the tip of the new chain, the reorg
1289        // notification should be received
1290        exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1291
1292        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1293
1294        // Send the notification
1295        match exex_handle.send(&mut cx, &(22, notification.clone())) {
1296            Poll::Ready(Ok(())) => {
1297                let received_notification = notifications.next().await.unwrap().unwrap();
1298                assert_eq!(received_notification, notification);
1299            }
1300            Poll::Pending | Poll::Ready(Err(_)) => {
1301                panic!("Notification should not be pending or fail")
1302            }
1303        }
1304
1305        // Ensure the notification ID was incremented
1306        assert_eq!(exex_handle.next_notification_id, 23);
1307    }
1308
1309    #[tokio::test]
1310    async fn test_exex_wal() -> eyre::Result<()> {
1311        reth_tracing::init_test_tracing();
1312
1313        let mut rng = generators::rng();
1314
1315        let provider_factory = create_test_provider_factory();
1316        let genesis_hash = init_genesis(&provider_factory).unwrap();
1317        let genesis_block = provider_factory
1318            .sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash)
1319            .unwrap()
1320            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
1321
1322        let block = random_block(
1323            &mut rng,
1324            genesis_block.number + 1,
1325            BlockParams { parent: Some(genesis_hash), ..Default::default() },
1326        )
1327        .try_recover()
1328        .unwrap();
1329        let provider_rw = provider_factory.database_provider_rw().unwrap();
1330        provider_rw.insert_block(&block).unwrap();
1331        provider_rw.commit().unwrap();
1332
1333        let provider = BlockchainProvider::new(provider_factory).unwrap();
1334
1335        let temp_dir = tempfile::tempdir().unwrap();
1336        let wal = Wal::new(temp_dir.path()).unwrap();
1337
1338        let (exex_handle, events_tx, mut notifications) = ExExHandle::new(
1339            "test_exex".to_string(),
1340            Default::default(),
1341            provider.clone(),
1342            EthEvmConfig::mainnet(),
1343            wal.handle(),
1344        );
1345
1346        let genesis_notification = ExExNotification::ChainCommitted {
1347            new: Arc::new(Chain::new(
1348                vec![genesis_block.clone()],
1349                Default::default(),
1350                Default::default(),
1351            )),
1352        };
1353        let notification = ExExNotification::ChainCommitted {
1354            new: Arc::new(Chain::new(vec![block.clone()], Default::default(), Default::default())),
1355        };
1356
1357        let (finalized_headers_tx, rx) = watch::channel(None);
1358        finalized_headers_tx.send(Some(genesis_block.clone_sealed_header()))?;
1359        let finalized_header_stream = ForkChoiceStream::new(rx);
1360
1361        let mut exex_manager = std::pin::pin!(ExExManager::new(
1362            provider,
1363            vec![exex_handle],
1364            2,
1365            wal,
1366            finalized_header_stream
1367        ));
1368
1369        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1370
1371        exex_manager
1372            .handle()
1373            .send(ExExNotificationSource::Pipeline, genesis_notification.clone())?;
1374        exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?;
1375
1376        assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1377        assert_eq!(
1378            notifications.try_poll_next_unpin(&mut cx)?,
1379            Poll::Ready(Some(genesis_notification))
1380        );
1381        assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1382        assert_eq!(
1383            notifications.try_poll_next_unpin(&mut cx)?,
1384            Poll::Ready(Some(notification.clone()))
1385        );
1386        // WAL shouldn't contain the genesis notification, because it's finalized
1387        assert_eq!(
1388            exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1389            std::slice::from_ref(&notification)
1390        );
1391
1392        finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1393        assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1394        // WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event
1395        assert_eq!(
1396            exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1397            std::slice::from_ref(&notification)
1398        );
1399
1400        // Send a `FinishedHeight` event with a non-canonical block
1401        events_tx
1402            .send(ExExEvent::FinishedHeight((rng.random::<u64>(), rng.random::<B256>()).into()))
1403            .unwrap();
1404
1405        finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1406        assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1407        // WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a
1408        // non-canonical block
1409        assert_eq!(
1410            exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1411            std::slice::from_ref(&notification)
1412        );
1413
1414        // Send a `FinishedHeight` event with a canonical block
1415        events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
1416
1417        finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1418        assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1419        // WAL is finalized
1420        assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None);
1421
1422        Ok(())
1423    }
1424
1425    #[tokio::test]
1426    async fn test_deadlock_manager_wakes_after_buffer_clears() {
1427        // This test simulates the scenario where the buffer fills up, ingestion pauses,
1428        // and then space clears. We verify the manager wakes up to process pending items.
1429
1430        let temp_dir = tempfile::tempdir().unwrap();
1431        let wal = Wal::new(temp_dir.path()).unwrap();
1432        let provider_factory = create_test_provider_factory();
1433        init_genesis(&provider_factory).unwrap();
1434        let provider = BlockchainProvider::new(provider_factory.clone()).unwrap();
1435
1436        // 1. Setup Manager with Capacity = 1
1437        let (exex_handle, _, mut notifications) = ExExHandle::new(
1438            "test_exex".to_string(),
1439            Default::default(),
1440            provider,
1441            EthEvmConfig::mainnet(),
1442            wal.handle(),
1443        );
1444
1445        let max_capacity = 2;
1446        let exex_manager = ExExManager::new(
1447            provider_factory,
1448            vec![exex_handle],
1449            max_capacity,
1450            wal,
1451            empty_finalized_header_stream(),
1452        );
1453
1454        let manager_handle = exex_manager.handle();
1455
1456        // Spawn manager in background so it runs continuously
1457        tokio::spawn(async move {
1458            exex_manager.await.ok();
1459        });
1460
1461        // Helper to create notifications
1462        let mut rng = generators::rng();
1463        let mut make_notif = |id: u64| {
1464            let block = random_block(&mut rng, id, BlockParams::default()).try_recover().unwrap();
1465            ExExNotification::ChainCommitted {
1466                new: Arc::new(Chain::new(vec![block], Default::default(), Default::default())),
1467            }
1468        };
1469
1470        manager_handle.send(ExExNotificationSource::Pipeline, make_notif(1)).unwrap();
1471
1472        // Send the "Stuck" Item (Notification #100).
1473        // At this point, the Manager loop has skipped the ingestion logic because buffer is full
1474        // (buffer_full=true). This item sits in the unbounded 'handle_rx' channel waiting.
1475        manager_handle.send(ExExNotificationSource::Pipeline, make_notif(100)).unwrap();
1476
1477        // 3. Relieve Pressure
1478        // We consume items from the ExEx.
1479        // As we pull items out, the ExEx frees space -> Manager sends buffered item -> Manager
1480        // frees space. Once Manager frees space, the FIX (wake_by_ref) should trigger,
1481        // causing it to read Notif #100.
1482
1483        // Consume the jam
1484        let _ = notifications.next().await.unwrap();
1485
1486        // 4. Assert No Deadlock
1487        // We expect Notification #100 next.
1488        // If the wake_by_ref fix is missing, this will Time Out because the manager is sleeping
1489        // despite having empty buffer.
1490        let result =
1491            tokio::time::timeout(std::time::Duration::from_secs(1), notifications.next()).await;
1492
1493        assert!(
1494            result.is_ok(),
1495            "Deadlock detected! Manager failed to wake up and process Pending Item #100."
1496        );
1497    }
1498}