reth_exex/
manager.rs

1use crate::{
2    wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
3};
4use alloy_consensus::BlockHeader;
5use alloy_eips::BlockNumHash;
6use futures::StreamExt;
7use itertools::Itertools;
8use metrics::Gauge;
9use reth_chain_state::ForkChoiceStream;
10use reth_ethereum_primitives::EthPrimitives;
11use reth_evm::execute::BlockExecutorProvider;
12use reth_metrics::{metrics::Counter, Metrics};
13use reth_node_api::NodePrimitives;
14use reth_primitives_traits::SealedHeader;
15use reth_provider::HeaderProvider;
16use reth_tracing::tracing::{debug, warn};
17use std::{
18    collections::VecDeque,
19    fmt::Debug,
20    future::{poll_fn, Future},
21    ops::Not,
22    pin::Pin,
23    sync::{
24        atomic::{AtomicUsize, Ordering},
25        Arc,
26    },
27    task::{ready, Context, Poll},
28};
29use tokio::sync::{
30    mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender},
31    watch,
32};
33use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
34
35/// Default max size of the internal state notifications buffer.
36///
37/// 1024 notifications in the buffer is 3.5 hours of mainnet blocks,
38/// or 17 minutes of 1-second blocks.
39pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024;
40
41/// The maximum number of blocks allowed in the WAL before emitting a warning.
42///
43/// This constant defines the threshold for the Write-Ahead Log (WAL) size. If the number of blocks
44/// in the WAL exceeds this limit, a warning is logged to indicate potential issues.
45pub const WAL_BLOCKS_WARNING: usize = 128;
46
47/// The source of the notification.
48///
49/// This distinguishment is needed to not commit any pipeline notificatations to [WAL](`Wal`),
50/// because they are already finalized.
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum ExExNotificationSource {
53    /// The notification was sent from the pipeline.
54    Pipeline,
55    /// The notification was sent from the blockchain tree.
56    BlockchainTree,
57}
58
59/// Metrics for an `ExEx`.
60#[derive(Metrics)]
61#[metrics(scope = "exex")]
62struct ExExMetrics {
63    /// The total number of notifications sent to an `ExEx`.
64    notifications_sent_total: Counter,
65    /// The total number of events an `ExEx` has sent to the manager.
66    events_sent_total: Counter,
67}
68
69/// A handle to an `ExEx` used by the [`ExExManager`] to communicate with `ExEx`'s.
70///
71/// A handle should be created for each `ExEx` with a unique ID. The channels returned by
72/// [`ExExHandle::new`] should be given to the `ExEx`, while the handle itself should be given to
73/// the manager in [`ExExManager::new`].
74#[derive(Debug)]
75pub struct ExExHandle<N: NodePrimitives = EthPrimitives> {
76    /// The execution extension's ID.
77    id: String,
78    /// Metrics for an `ExEx`.
79    metrics: ExExMetrics,
80    /// Channel to send [`ExExNotification`]s to the `ExEx`.
81    sender: PollSender<ExExNotification<N>>,
82    /// Channel to receive [`ExExEvent`]s from the `ExEx`.
83    receiver: UnboundedReceiver<ExExEvent>,
84    /// The ID of the next notification to send to this `ExEx`.
85    next_notification_id: usize,
86    /// The finished block of the `ExEx`.
87    ///
88    /// If this is `None`, the `ExEx` has not emitted a `FinishedHeight` event.
89    finished_height: Option<BlockNumHash>,
90}
91
92impl<N: NodePrimitives> ExExHandle<N> {
93    /// Create a new handle for the given `ExEx`.
94    ///
95    /// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a
96    /// [`mpsc::Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`.
97    pub fn new<P, E: BlockExecutorProvider<Primitives = N>>(
98        id: String,
99        node_head: BlockNumHash,
100        provider: P,
101        executor: E,
102        wal_handle: WalHandle<N>,
103    ) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<P, E>) {
104        let (notification_tx, notification_rx) = mpsc::channel(1);
105        let (event_tx, event_rx) = mpsc::unbounded_channel();
106        let notifications =
107            ExExNotifications::new(node_head, provider, executor, notification_rx, wal_handle);
108
109        (
110            Self {
111                id: id.clone(),
112                metrics: ExExMetrics::new_with_labels(&[("exex", id)]),
113                sender: PollSender::new(notification_tx),
114                receiver: event_rx,
115                next_notification_id: 0,
116                finished_height: None,
117            },
118            event_tx,
119            notifications,
120        )
121    }
122
123    /// Reserves a slot in the `PollSender` channel and sends the notification if the slot was
124    /// successfully reserved.
125    ///
126    /// When the notification is sent, it is considered delivered.
127    fn send(
128        &mut self,
129        cx: &mut Context<'_>,
130        (notification_id, notification): &(usize, ExExNotification<N>),
131    ) -> Poll<Result<(), PollSendError<ExExNotification<N>>>> {
132        if let Some(finished_height) = self.finished_height {
133            match notification {
134                ExExNotification::ChainCommitted { new } => {
135                    // Skip the chain commit notification if the finished height of the ExEx is
136                    // higher than or equal to the tip of the new notification.
137                    // I.e., the ExEx has already processed the notification.
138                    if finished_height.number >= new.tip().number() {
139                        debug!(
140                            target: "exex::manager",
141                            exex_id = %self.id,
142                            %notification_id,
143                            ?finished_height,
144                            new_tip = %new.tip().number(),
145                            "Skipping notification"
146                        );
147
148                        self.next_notification_id = notification_id + 1;
149                        return Poll::Ready(Ok(()))
150                    }
151                }
152                // Do not handle [ExExNotification::ChainReorged] and
153                // [ExExNotification::ChainReverted] cases and always send the
154                // notification, because the ExEx should be aware of the reorgs and reverts lower
155                // than its finished height
156                ExExNotification::ChainReorged { .. } | ExExNotification::ChainReverted { .. } => {}
157            }
158        }
159
160        debug!(
161            target: "exex::manager",
162            exex_id = %self.id,
163            %notification_id,
164            "Reserving slot for notification"
165        );
166        match self.sender.poll_reserve(cx) {
167            Poll::Ready(Ok(())) => (),
168            other => return other,
169        }
170
171        debug!(
172            target: "exex::manager",
173            exex_id = %self.id,
174            %notification_id,
175            "Sending notification"
176        );
177        match self.sender.send_item(notification.clone()) {
178            Ok(()) => {
179                self.next_notification_id = notification_id + 1;
180                self.metrics.notifications_sent_total.increment(1);
181                Poll::Ready(Ok(()))
182            }
183            Err(err) => Poll::Ready(Err(err)),
184        }
185    }
186}
187
188/// Metrics for the `ExEx` manager.
189#[derive(Metrics)]
190#[metrics(scope = "exex.manager")]
191pub struct ExExManagerMetrics {
192    /// Max size of the internal state notifications buffer.
193    max_capacity: Gauge,
194    /// Current capacity of the internal state notifications buffer.
195    current_capacity: Gauge,
196    /// Current size of the internal state notifications buffer.
197    ///
198    /// Note that this might be slightly bigger than the maximum capacity in some cases.
199    buffer_size: Gauge,
200    /// Current number of `ExEx`'s on the node.
201    num_exexs: Gauge,
202}
203
204/// The execution extension manager.
205///
206/// The manager is responsible for:
207///
208/// - Receiving relevant events from the rest of the node, and sending these to the execution
209///   extensions
210/// - Backpressure
211/// - Error handling
212/// - Monitoring
213#[derive(Debug)]
214pub struct ExExManager<P, N: NodePrimitives> {
215    /// Provider for querying headers.
216    provider: P,
217
218    /// Handles to communicate with the `ExEx`'s.
219    exex_handles: Vec<ExExHandle<N>>,
220
221    /// [`ExExNotification`] channel from the [`ExExManagerHandle`]s.
222    handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification<N>)>,
223
224    /// The minimum notification ID currently present in the buffer.
225    min_id: usize,
226    /// Monotonically increasing ID for [`ExExNotification`]s.
227    next_id: usize,
228    /// Internal buffer of [`ExExNotification`]s.
229    ///
230    /// The first element of the tuple is a monotonically increasing ID unique to the notification
231    /// (the second element of the tuple).
232    buffer: VecDeque<(usize, ExExNotification<N>)>,
233    /// Max size of the internal state notifications buffer.
234    max_capacity: usize,
235    /// Current state notifications buffer capacity.
236    ///
237    /// Used to inform the execution stage of possible batch sizes.
238    current_capacity: Arc<AtomicUsize>,
239
240    /// Whether the manager is ready to receive new notifications.
241    is_ready: watch::Sender<bool>,
242
243    /// The finished height of all `ExEx`'s.
244    finished_height: watch::Sender<FinishedExExHeight>,
245
246    /// Write-Ahead Log for the [`ExExNotification`]s.
247    wal: Wal<N>,
248    /// A stream of finalized headers.
249    finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
250
251    /// A handle to the `ExEx` manager.
252    handle: ExExManagerHandle<N>,
253    /// Metrics for the `ExEx` manager.
254    metrics: ExExManagerMetrics,
255}
256
257impl<P, N> ExExManager<P, N>
258where
259    N: NodePrimitives,
260{
261    /// Create a new [`ExExManager`].
262    ///
263    /// You must provide an [`ExExHandle`] for each `ExEx` and the maximum capacity of the
264    /// notification buffer in the manager.
265    ///
266    /// When the capacity is exceeded (which can happen if an `ExEx` is slow) no one can send
267    /// notifications over [`ExExManagerHandle`]s until there is capacity again.
268    pub fn new(
269        provider: P,
270        handles: Vec<ExExHandle<N>>,
271        max_capacity: usize,
272        wal: Wal<N>,
273        finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
274    ) -> Self {
275        let num_exexs = handles.len();
276
277        let (handle_tx, handle_rx) = mpsc::unbounded_channel();
278        let (is_ready_tx, is_ready_rx) = watch::channel(true);
279        let (finished_height_tx, finished_height_rx) = watch::channel(if num_exexs == 0 {
280            FinishedExExHeight::NoExExs
281        } else {
282            FinishedExExHeight::NotReady
283        });
284
285        let current_capacity = Arc::new(AtomicUsize::new(max_capacity));
286
287        let metrics = ExExManagerMetrics::default();
288        metrics.max_capacity.set(max_capacity as f64);
289        metrics.num_exexs.set(num_exexs as f64);
290
291        Self {
292            provider,
293
294            exex_handles: handles,
295
296            handle_rx,
297
298            min_id: 0,
299            next_id: 0,
300            buffer: VecDeque::with_capacity(max_capacity),
301            max_capacity,
302            current_capacity: Arc::clone(&current_capacity),
303
304            is_ready: is_ready_tx,
305            finished_height: finished_height_tx,
306
307            wal,
308            finalized_header_stream,
309
310            handle: ExExManagerHandle {
311                exex_tx: handle_tx,
312                num_exexs,
313                is_ready_receiver: is_ready_rx.clone(),
314                is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
315                current_capacity,
316                finished_height: finished_height_rx,
317            },
318            metrics,
319        }
320    }
321
322    /// Returns the handle to the manager.
323    pub fn handle(&self) -> ExExManagerHandle<N> {
324        self.handle.clone()
325    }
326
327    /// Updates the current buffer capacity and notifies all `is_ready` watchers of the manager's
328    /// readiness to receive notifications.
329    fn update_capacity(&self) {
330        let capacity = self.max_capacity.saturating_sub(self.buffer.len());
331        self.current_capacity.store(capacity, Ordering::Relaxed);
332        self.metrics.current_capacity.set(capacity as f64);
333        self.metrics.buffer_size.set(self.buffer.len() as f64);
334
335        // we can safely ignore if the channel is closed, since the manager always holds it open
336        // internally
337        let _ = self.is_ready.send(capacity > 0);
338    }
339
340    /// Pushes a new notification into the managers internal buffer, assigning the notification a
341    /// unique ID.
342    fn push_notification(&mut self, notification: ExExNotification<N>) {
343        let next_id = self.next_id;
344        self.buffer.push_back((next_id, notification));
345        self.next_id += 1;
346    }
347}
348
349impl<P, N> ExExManager<P, N>
350where
351    P: HeaderProvider,
352    N: NodePrimitives,
353{
354    /// Finalizes the WAL according to the passed finalized header.
355    ///
356    /// This function checks if all ExExes are on the canonical chain and finalizes the WAL if
357    /// necessary.
358    fn finalize_wal(&self, finalized_header: SealedHeader<N::BlockHeader>) -> eyre::Result<()> {
359        debug!(target: "exex::manager", header = ?finalized_header.num_hash(), "Received finalized header");
360
361        // Check if all ExExes are on the canonical chain
362        let exex_finished_heights = self
363            .exex_handles
364            .iter()
365            // Get ID and finished height for each ExEx
366            .map(|exex_handle| (&exex_handle.id, exex_handle.finished_height))
367            // Deduplicate all hashes
368            .unique_by(|(_, num_hash)| num_hash.map(|num_hash| num_hash.hash))
369            // Check if hashes are canonical
370            .map(|(exex_id, num_hash)| {
371                num_hash.map_or(Ok((exex_id, num_hash, false)), |num_hash| {
372                    self.provider
373                        .is_known(&num_hash.hash)
374                        // Save the ExEx ID, finished height, and whether the hash is canonical
375                        .map(|is_canonical| (exex_id, Some(num_hash), is_canonical))
376                })
377            })
378            // We collect here to be able to log the unfinalized ExExes below
379            .collect::<Result<Vec<_>, _>>()?;
380        if exex_finished_heights.iter().all(|(_, _, is_canonical)| *is_canonical) {
381            // If there is a finalized header and all ExExs are on the canonical chain, finalize
382            // the WAL with either the lowest finished height among all ExExes, or finalized header
383            // – whichever is lower.
384            let lowest_finished_height = exex_finished_heights
385                .iter()
386                .copied()
387                .filter_map(|(_, num_hash, _)| num_hash)
388                .chain([(finalized_header.num_hash())])
389                .min_by_key(|num_hash| num_hash.number)
390                .unwrap();
391
392            self.wal.finalize(lowest_finished_height)?;
393            if self.wal.num_blocks() > WAL_BLOCKS_WARNING {
394                warn!(
395                    target: "exex::manager",
396                    blocks = ?self.wal.num_blocks(),
397                    "WAL contains too many blocks and is not getting cleared. That will lead to increased disk space usage. Check that you emit the FinishedHeight event from your ExExes."
398                );
399            }
400        } else {
401            let unfinalized_exexes = exex_finished_heights
402                .into_iter()
403                .filter_map(|(exex_id, num_hash, is_canonical)| {
404                    is_canonical.not().then_some((exex_id, num_hash))
405                })
406                .format_with(", ", |(exex_id, num_hash), f| {
407                    f(&format_args!("{exex_id} = {num_hash:?}"))
408                })
409                // We need this because `debug!` uses the argument twice when formatting the final
410                // log message, but the result of `format_with` can only be used once
411                .to_string();
412            debug!(
413                target: "exex::manager",
414                %unfinalized_exexes,
415                "Not all ExExes are on the canonical chain, can't finalize the WAL"
416            );
417        }
418
419        Ok(())
420    }
421}
422
423impl<P, N> Future for ExExManager<P, N>
424where
425    P: HeaderProvider + Unpin + 'static,
426    N: NodePrimitives,
427{
428    type Output = eyre::Result<()>;
429
430    /// Main loop of the [`ExExManager`]. The order of operations is as follows:
431    /// 1. Handle incoming ExEx events. We do it before finalizing the WAL, because it depends on
432    ///    the latest state of [`ExExEvent::FinishedHeight`] events.
433    /// 2. Finalize the WAL with the finalized header, if necessary.
434    /// 3. Drain [`ExExManagerHandle`] notifications, push them to the internal buffer and update
435    ///    the internal buffer capacity.
436    /// 5. Send notifications from the internal buffer to those ExExes that are ready to receive new
437    ///    notifications.
438    /// 5. Remove notifications from the internal buffer that have been sent to **all** ExExes and
439    ///    update the internal buffer capacity.
440    /// 6. Update the channel with the lowest [`FinishedExExHeight`] among all ExExes.
441    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
442        let this = self.get_mut();
443
444        // Handle incoming ExEx events
445        for exex in &mut this.exex_handles {
446            while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
447                debug!(target: "exex::manager", exex_id = %exex.id, ?event, "Received event from ExEx");
448                exex.metrics.events_sent_total.increment(1);
449                match event {
450                    ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),
451                }
452            }
453        }
454
455        // Drain the finalized header stream and finalize the WAL with the last header
456        let mut last_finalized_header = None;
457        while let Poll::Ready(finalized_header) = this.finalized_header_stream.poll_next_unpin(cx) {
458            last_finalized_header = finalized_header;
459        }
460        if let Some(header) = last_finalized_header {
461            this.finalize_wal(header)?;
462        }
463
464        // Drain handle notifications
465        while this.buffer.len() < this.max_capacity {
466            if let Poll::Ready(Some((source, notification))) = this.handle_rx.poll_recv(cx) {
467                let committed_tip =
468                    notification.committed_chain().map(|chain| chain.tip().number());
469                let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number());
470                debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Received new notification");
471
472                // Commit to WAL only notifications from blockchain tree. Pipeline notifications
473                // always contain only finalized blocks.
474                match source {
475                    ExExNotificationSource::BlockchainTree => {
476                        debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Committing notification to WAL");
477                        this.wal.commit(&notification)?;
478                    }
479                    ExExNotificationSource::Pipeline => {
480                        debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Notification was sent from pipeline, skipping WAL commit");
481                    }
482                }
483
484                this.push_notification(notification);
485                continue
486            }
487            break
488        }
489
490        // Update capacity
491        this.update_capacity();
492
493        // Advance all poll senders
494        let mut min_id = usize::MAX;
495        for idx in (0..this.exex_handles.len()).rev() {
496            let mut exex = this.exex_handles.swap_remove(idx);
497
498            // It is a logic error for this to ever underflow since the manager manages the
499            // notification IDs
500            let notification_index = exex
501                .next_notification_id
502                .checked_sub(this.min_id)
503                .expect("exex expected notification ID outside the manager's range");
504            if let Some(notification) = this.buffer.get(notification_index) {
505                if let Poll::Ready(Err(err)) = exex.send(cx, notification) {
506                    // The channel was closed, which is irrecoverable for the manager
507                    return Poll::Ready(Err(err.into()))
508                }
509            }
510            min_id = min_id.min(exex.next_notification_id);
511            this.exex_handles.push(exex);
512        }
513
514        // Remove processed buffered notifications
515        debug!(target: "exex::manager", %min_id, "Updating lowest notification id in buffer");
516        this.buffer.retain(|&(id, _)| id >= min_id);
517        this.min_id = min_id;
518
519        // Update capacity
520        this.update_capacity();
521
522        // Update watch channel block number
523        let finished_height = this.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
524            exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr)))
525        });
526        if let Ok(finished_height) = finished_height {
527            let _ = this.finished_height.send(FinishedExExHeight::Height(finished_height));
528        }
529
530        Poll::Pending
531    }
532}
533
534/// A handle to communicate with the [`ExExManager`].
535#[derive(Debug)]
536pub struct ExExManagerHandle<N: NodePrimitives = EthPrimitives> {
537    /// Channel to send notifications to the `ExEx` manager.
538    exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification<N>)>,
539    /// The number of `ExEx`'s running on the node.
540    num_exexs: usize,
541    /// A watch channel denoting whether the manager is ready for new notifications or not.
542    ///
543    /// This is stored internally alongside a `ReusableBoxFuture` representation of the same value.
544    /// This field is only used to create a new `ReusableBoxFuture` when the handle is cloned,
545    /// but is otherwise unused.
546    is_ready_receiver: watch::Receiver<bool>,
547    /// A reusable future that resolves when the manager is ready for new
548    /// notifications.
549    is_ready: ReusableBoxFuture<'static, watch::Receiver<bool>>,
550    /// The current capacity of the manager's internal notification buffer.
551    current_capacity: Arc<AtomicUsize>,
552    /// The finished height of all `ExEx`'s.
553    finished_height: watch::Receiver<FinishedExExHeight>,
554}
555
556impl<N: NodePrimitives> ExExManagerHandle<N> {
557    /// Creates an empty manager handle.
558    ///
559    /// Use this if there is no manager present.
560    ///
561    /// The handle will always be ready, and have a capacity of 0.
562    pub fn empty() -> Self {
563        let (exex_tx, _) = mpsc::unbounded_channel();
564        let (_, is_ready_rx) = watch::channel(true);
565        let (_, finished_height_rx) = watch::channel(FinishedExExHeight::NoExExs);
566
567        Self {
568            exex_tx,
569            num_exexs: 0,
570            is_ready_receiver: is_ready_rx.clone(),
571            is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
572            current_capacity: Arc::new(AtomicUsize::new(0)),
573            finished_height: finished_height_rx,
574        }
575    }
576
577    /// Synchronously send a notification over the channel to all execution extensions.
578    ///
579    /// Senders should call [`Self::has_capacity`] first.
580    pub fn send(
581        &self,
582        source: ExExNotificationSource,
583        notification: ExExNotification<N>,
584    ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
585        self.exex_tx.send((source, notification))
586    }
587
588    /// Asynchronously send a notification over the channel to all execution extensions.
589    ///
590    /// The returned future resolves when the notification has been delivered. If there is no
591    /// capacity in the channel, the future will wait.
592    pub async fn send_async(
593        &mut self,
594        source: ExExNotificationSource,
595        notification: ExExNotification<N>,
596    ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification<N>)>> {
597        self.ready().await;
598        self.exex_tx.send((source, notification))
599    }
600
601    /// Get the current capacity of the `ExEx` manager's internal notification buffer.
602    pub fn capacity(&self) -> usize {
603        self.current_capacity.load(Ordering::Relaxed)
604    }
605
606    /// Whether there is capacity in the `ExEx` manager's internal notification buffer.
607    ///
608    /// If this returns `false`, the owner of the handle should **NOT** send new notifications over
609    /// the channel until the manager is ready again, as this can lead to unbounded memory growth.
610    pub fn has_capacity(&self) -> bool {
611        self.capacity() > 0
612    }
613
614    /// Returns `true` if there are `ExEx`'s installed in the node.
615    pub const fn has_exexs(&self) -> bool {
616        self.num_exexs > 0
617    }
618
619    /// The finished height of all `ExEx`'s.
620    pub fn finished_height(&self) -> watch::Receiver<FinishedExExHeight> {
621        self.finished_height.clone()
622    }
623
624    /// Wait until the manager is ready for new notifications.
625    pub async fn ready(&mut self) {
626        poll_fn(|cx| self.poll_ready(cx)).await
627    }
628
629    /// Wait until the manager is ready for new notifications.
630    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
631        let rx = ready!(self.is_ready.poll(cx));
632        self.is_ready.set(make_wait_future(rx));
633        Poll::Ready(())
634    }
635}
636
637/// Creates a future that resolves once the given watch channel receiver is true.
638async fn make_wait_future(mut rx: watch::Receiver<bool>) -> watch::Receiver<bool> {
639    // NOTE(onbjerg): We can ignore the error here, because if the channel is closed, the node
640    // is shutting down.
641    let _ = rx.wait_for(|ready| *ready).await;
642    rx
643}
644
645impl<N: NodePrimitives> Clone for ExExManagerHandle<N> {
646    fn clone(&self) -> Self {
647        Self {
648            exex_tx: self.exex_tx.clone(),
649            num_exexs: self.num_exexs,
650            is_ready_receiver: self.is_ready_receiver.clone(),
651            is_ready: ReusableBoxFuture::new(make_wait_future(self.is_ready_receiver.clone())),
652            current_capacity: self.current_capacity.clone(),
653            finished_height: self.finished_height.clone(),
654        }
655    }
656}
657
658#[cfg(test)]
659mod tests {
660    use super::*;
661    use crate::wal::WalResult;
662    use alloy_primitives::B256;
663    use futures::{StreamExt, TryStreamExt};
664    use rand::Rng;
665    use reth_db_common::init::init_genesis;
666    use reth_evm::test_utils::MockExecutorProvider;
667    use reth_evm_ethereum::execute::EthExecutorProvider;
668    use reth_primitives_traits::RecoveredBlock;
669    use reth_provider::{
670        providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockReader,
671        BlockWriter, Chain, DatabaseProviderFactory, StorageLocation, TransactionVariant,
672    };
673    use reth_testing_utils::generators::{self, random_block, BlockParams};
674
675    fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
676        let (tx, rx) = watch::channel(None);
677        // Do not drop the sender, otherwise the receiver will always return an error
678        std::mem::forget(tx);
679        ForkChoiceStream::new(rx)
680    }
681
682    #[tokio::test]
683    async fn test_delivers_events() {
684        let temp_dir = tempfile::tempdir().unwrap();
685        let wal = Wal::new(temp_dir.path()).unwrap();
686
687        let (mut exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
688            "test_exex".to_string(),
689            Default::default(),
690            (),
691            MockExecutorProvider::default(),
692            wal.handle(),
693        );
694
695        // Send an event and check that it's delivered correctly
696        let event = ExExEvent::FinishedHeight(BlockNumHash::new(42, B256::random()));
697        event_tx.send(event).unwrap();
698        let received_event = exex_handle.receiver.recv().await.unwrap();
699        assert_eq!(received_event, event);
700    }
701
702    #[tokio::test]
703    async fn test_has_exexs() {
704        let temp_dir = tempfile::tempdir().unwrap();
705        let wal = Wal::new(temp_dir.path()).unwrap();
706
707        let (exex_handle_1, _, _) = ExExHandle::new(
708            "test_exex_1".to_string(),
709            Default::default(),
710            (),
711            MockExecutorProvider::default(),
712            wal.handle(),
713        );
714
715        assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
716            .handle
717            .has_exexs());
718
719        assert!(ExExManager::new((), vec![exex_handle_1], 0, wal, empty_finalized_header_stream())
720            .handle
721            .has_exexs());
722    }
723
724    #[tokio::test]
725    async fn test_has_capacity() {
726        let temp_dir = tempfile::tempdir().unwrap();
727        let wal = Wal::new(temp_dir.path()).unwrap();
728
729        let (exex_handle_1, _, _) = ExExHandle::new(
730            "test_exex_1".to_string(),
731            Default::default(),
732            (),
733            MockExecutorProvider::default(),
734            wal.handle(),
735        );
736
737        assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
738            .handle
739            .has_capacity());
740
741        assert!(ExExManager::new(
742            (),
743            vec![exex_handle_1],
744            10,
745            wal,
746            empty_finalized_header_stream()
747        )
748        .handle
749        .has_capacity());
750    }
751
752    #[test]
753    fn test_push_notification() {
754        let temp_dir = tempfile::tempdir().unwrap();
755        let wal = Wal::new(temp_dir.path()).unwrap();
756
757        let (exex_handle, _, _) = ExExHandle::new(
758            "test_exex".to_string(),
759            Default::default(),
760            (),
761            MockExecutorProvider::default(),
762            wal.handle(),
763        );
764
765        // Create a mock ExExManager and add the exex_handle to it
766        let mut exex_manager =
767            ExExManager::new((), vec![exex_handle], 10, wal, empty_finalized_header_stream());
768
769        // Define the notification for testing
770        let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
771        block1.set_hash(B256::new([0x01; 32]));
772        block1.set_block_number(10);
773
774        let notification1 = ExExNotification::ChainCommitted {
775            new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
776        };
777
778        // Push the first notification
779        exex_manager.push_notification(notification1.clone());
780
781        // Verify the buffer contains the notification with the correct ID
782        assert_eq!(exex_manager.buffer.len(), 1);
783        assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
784        assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
785        assert_eq!(exex_manager.next_id, 1);
786
787        // Push another notification
788        let mut block2: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
789        block2.set_hash(B256::new([0x02; 32]));
790        block2.set_block_number(20);
791
792        let notification2 = ExExNotification::ChainCommitted {
793            new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
794        };
795
796        exex_manager.push_notification(notification2.clone());
797
798        // Verify the buffer contains both notifications with correct IDs
799        assert_eq!(exex_manager.buffer.len(), 2);
800        assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
801        assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
802        assert_eq!(exex_manager.buffer.get(1).unwrap().0, 1);
803        assert_eq!(exex_manager.buffer.get(1).unwrap().1, notification2);
804        assert_eq!(exex_manager.next_id, 2);
805    }
806
807    #[test]
808    fn test_update_capacity() {
809        let temp_dir = tempfile::tempdir().unwrap();
810        let wal = Wal::new(temp_dir.path()).unwrap();
811
812        let (exex_handle, _, _) = ExExHandle::new(
813            "test_exex".to_string(),
814            Default::default(),
815            (),
816            MockExecutorProvider::default(),
817            wal.handle(),
818        );
819
820        // Create a mock ExExManager and add the exex_handle to it
821        let max_capacity = 5;
822        let mut exex_manager = ExExManager::new(
823            (),
824            vec![exex_handle],
825            max_capacity,
826            wal,
827            empty_finalized_header_stream(),
828        );
829
830        // Push some notifications to fill part of the buffer
831        let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
832        block1.set_hash(B256::new([0x01; 32]));
833        block1.set_block_number(10);
834
835        let notification1 = ExExNotification::ChainCommitted {
836            new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
837        };
838
839        exex_manager.push_notification(notification1.clone());
840        exex_manager.push_notification(notification1);
841
842        // Update capacity
843        exex_manager.update_capacity();
844
845        // Verify current capacity and metrics
846        assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity - 2);
847
848        // Clear the buffer and update capacity
849        exex_manager.buffer.clear();
850        exex_manager.update_capacity();
851
852        // Verify current capacity
853        assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity);
854    }
855
856    #[tokio::test]
857    async fn test_updates_block_height() {
858        let temp_dir = tempfile::tempdir().unwrap();
859        let wal = Wal::new(temp_dir.path()).unwrap();
860
861        let provider_factory = create_test_provider_factory();
862
863        let (exex_handle, event_tx, mut _notification_rx) = ExExHandle::new(
864            "test_exex".to_string(),
865            Default::default(),
866            (),
867            MockExecutorProvider::default(),
868            wal.handle(),
869        );
870
871        // Check initial block height
872        assert!(exex_handle.finished_height.is_none());
873
874        // Update the block height via an event
875        let block = BlockNumHash::new(42, B256::random());
876        event_tx.send(ExExEvent::FinishedHeight(block)).unwrap();
877
878        // Create a mock ExExManager and add the exex_handle to it
879        let exex_manager = ExExManager::new(
880            provider_factory,
881            vec![exex_handle],
882            10,
883            Wal::new(temp_dir.path()).unwrap(),
884            empty_finalized_header_stream(),
885        );
886
887        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
888
889        // Pin the ExExManager to call the poll method
890        let mut pinned_manager = std::pin::pin!(exex_manager);
891        let _ = pinned_manager.as_mut().poll(&mut cx);
892
893        // Check that the block height was updated
894        let updated_exex_handle = &pinned_manager.exex_handles[0];
895        assert_eq!(updated_exex_handle.finished_height, Some(block));
896
897        // Get the receiver for the finished height
898        let mut receiver = pinned_manager.handle.finished_height();
899
900        // Wait for a new value to be sent
901        receiver.changed().await.unwrap();
902
903        // Get the latest value
904        let finished_height = *receiver.borrow();
905
906        // The finished height should be updated to the lower block height
907        assert_eq!(finished_height, FinishedExExHeight::Height(42));
908    }
909
910    #[tokio::test]
911    async fn test_updates_block_height_lower() {
912        let temp_dir = tempfile::tempdir().unwrap();
913        let wal = Wal::new(temp_dir.path()).unwrap();
914
915        let provider_factory = create_test_provider_factory();
916
917        // Create two `ExExHandle` instances
918        let (exex_handle1, event_tx1, _) = ExExHandle::new(
919            "test_exex1".to_string(),
920            Default::default(),
921            (),
922            MockExecutorProvider::default(),
923            wal.handle(),
924        );
925        let (exex_handle2, event_tx2, _) = ExExHandle::new(
926            "test_exex2".to_string(),
927            Default::default(),
928            (),
929            MockExecutorProvider::default(),
930            wal.handle(),
931        );
932
933        let block1 = BlockNumHash::new(42, B256::random());
934        let block2 = BlockNumHash::new(10, B256::random());
935
936        // Send events to update the block heights of the two handles, with the second being lower
937        event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
938        event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
939
940        let exex_manager = ExExManager::new(
941            provider_factory,
942            vec![exex_handle1, exex_handle2],
943            10,
944            Wal::new(temp_dir.path()).unwrap(),
945            empty_finalized_header_stream(),
946        );
947
948        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
949
950        let mut pinned_manager = std::pin::pin!(exex_manager);
951
952        let _ = pinned_manager.as_mut().poll(&mut cx);
953
954        // Get the receiver for the finished height
955        let mut receiver = pinned_manager.handle.finished_height();
956
957        // Wait for a new value to be sent
958        receiver.changed().await.unwrap();
959
960        // Get the latest value
961        let finished_height = *receiver.borrow();
962
963        // The finished height should be updated to the lower block height
964        assert_eq!(finished_height, FinishedExExHeight::Height(10));
965    }
966
967    #[tokio::test]
968    async fn test_updates_block_height_greater() {
969        let temp_dir = tempfile::tempdir().unwrap();
970        let wal = Wal::new(temp_dir.path()).unwrap();
971
972        let provider_factory = create_test_provider_factory();
973
974        // Create two `ExExHandle` instances
975        let (exex_handle1, event_tx1, _) = ExExHandle::new(
976            "test_exex1".to_string(),
977            Default::default(),
978            (),
979            MockExecutorProvider::default(),
980            wal.handle(),
981        );
982        let (exex_handle2, event_tx2, _) = ExExHandle::new(
983            "test_exex2".to_string(),
984            Default::default(),
985            (),
986            MockExecutorProvider::default(),
987            wal.handle(),
988        );
989
990        // Assert that the initial block height is `None` for the first `ExExHandle`.
991        assert!(exex_handle1.finished_height.is_none());
992
993        let block1 = BlockNumHash::new(42, B256::random());
994        let block2 = BlockNumHash::new(100, B256::random());
995
996        // Send events to update the block heights of the two handles, with the second being higher.
997        event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
998        event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
999
1000        let exex_manager = ExExManager::new(
1001            provider_factory,
1002            vec![exex_handle1, exex_handle2],
1003            10,
1004            Wal::new(temp_dir.path()).unwrap(),
1005            empty_finalized_header_stream(),
1006        );
1007
1008        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1009
1010        let mut pinned_manager = std::pin::pin!(exex_manager);
1011
1012        let _ = pinned_manager.as_mut().poll(&mut cx);
1013
1014        // Get the receiver for the finished height
1015        let mut receiver = pinned_manager.handle.finished_height();
1016
1017        // Wait for a new value to be sent
1018        receiver.changed().await.unwrap();
1019
1020        // Get the latest value
1021        let finished_height = *receiver.borrow();
1022
1023        // The finished height should be updated to the lower block height
1024        assert_eq!(finished_height, FinishedExExHeight::Height(42));
1025
1026        // // The lower block height should be retained
1027        // let updated_exex_handle = &pinned_manager.exex_handles[0];
1028        // assert_eq!(updated_exex_handle.finished_height, Some(42));
1029    }
1030
1031    #[tokio::test]
1032    async fn test_exex_manager_capacity() {
1033        let temp_dir = tempfile::tempdir().unwrap();
1034        let wal = Wal::new(temp_dir.path()).unwrap();
1035
1036        let provider_factory = create_test_provider_factory();
1037
1038        let (exex_handle_1, _, _) = ExExHandle::new(
1039            "test_exex_1".to_string(),
1040            Default::default(),
1041            (),
1042            MockExecutorProvider::default(),
1043            wal.handle(),
1044        );
1045
1046        // Create an ExExManager with a small max capacity
1047        let max_capacity = 2;
1048        let exex_manager = ExExManager::new(
1049            provider_factory,
1050            vec![exex_handle_1],
1051            max_capacity,
1052            Wal::new(temp_dir.path()).unwrap(),
1053            empty_finalized_header_stream(),
1054        );
1055
1056        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1057
1058        // Setup a notification
1059        let notification = ExExNotification::ChainCommitted {
1060            new: Arc::new(Chain::new(
1061                vec![Default::default()],
1062                Default::default(),
1063                Default::default(),
1064            )),
1065        };
1066
1067        // Send notifications to go over the max capacity
1068        exex_manager
1069            .handle
1070            .exex_tx
1071            .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1072            .unwrap();
1073        exex_manager
1074            .handle
1075            .exex_tx
1076            .send((ExExNotificationSource::BlockchainTree, notification.clone()))
1077            .unwrap();
1078        exex_manager
1079            .handle
1080            .exex_tx
1081            .send((ExExNotificationSource::BlockchainTree, notification))
1082            .unwrap();
1083
1084        // Pin the ExExManager to call the poll method
1085        let mut pinned_manager = std::pin::pin!(exex_manager);
1086
1087        // Before polling, the next notification ID should be 0 and the buffer should be empty
1088        assert_eq!(pinned_manager.next_id, 0);
1089        assert_eq!(pinned_manager.buffer.len(), 0);
1090
1091        let _ = pinned_manager.as_mut().poll(&mut cx);
1092
1093        // After polling, the next notification ID and buffer size should be updated
1094        assert_eq!(pinned_manager.next_id, 2);
1095        assert_eq!(pinned_manager.buffer.len(), 2);
1096    }
1097
1098    #[tokio::test]
1099    async fn exex_handle_new() {
1100        let provider_factory = create_test_provider_factory();
1101        init_genesis(&provider_factory).unwrap();
1102        let provider = BlockchainProvider::new(provider_factory).unwrap();
1103
1104        let temp_dir = tempfile::tempdir().unwrap();
1105        let wal = Wal::new(temp_dir.path()).unwrap();
1106
1107        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1108            "test_exex".to_string(),
1109            Default::default(),
1110            provider,
1111            EthExecutorProvider::mainnet(),
1112            wal.handle(),
1113        );
1114
1115        // Check initial state
1116        assert_eq!(exex_handle.id, "test_exex");
1117        assert_eq!(exex_handle.next_notification_id, 0);
1118
1119        // Setup two blocks for the chain commit notification
1120        let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1121        block1.set_hash(B256::new([0x01; 32]));
1122        block1.set_block_number(10);
1123
1124        let mut block2: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1125        block2.set_hash(B256::new([0x02; 32]));
1126        block2.set_block_number(11);
1127
1128        // Setup a notification
1129        let notification = ExExNotification::ChainCommitted {
1130            new: Arc::new(Chain::new(
1131                vec![block1.clone(), block2.clone()],
1132                Default::default(),
1133                Default::default(),
1134            )),
1135        };
1136
1137        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1138
1139        // Send a notification and ensure it's received correctly
1140        match exex_handle.send(&mut cx, &(22, notification.clone())) {
1141            Poll::Ready(Ok(())) => {
1142                let received_notification = notifications.next().await.unwrap().unwrap();
1143                assert_eq!(received_notification, notification);
1144            }
1145            Poll::Pending => panic!("Notification send is pending"),
1146            Poll::Ready(Err(e)) => panic!("Failed to send notification: {e:?}"),
1147        }
1148
1149        // Ensure the notification ID was incremented
1150        assert_eq!(exex_handle.next_notification_id, 23);
1151    }
1152
1153    #[tokio::test]
1154    async fn test_notification_if_finished_height_gt_chain_tip() {
1155        let provider_factory = create_test_provider_factory();
1156        init_genesis(&provider_factory).unwrap();
1157        let provider = BlockchainProvider::new(provider_factory).unwrap();
1158
1159        let temp_dir = tempfile::tempdir().unwrap();
1160        let wal = Wal::new(temp_dir.path()).unwrap();
1161
1162        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1163            "test_exex".to_string(),
1164            Default::default(),
1165            provider,
1166            EthExecutorProvider::mainnet(),
1167            wal.handle(),
1168        );
1169
1170        // Set finished_height to a value higher than the block tip
1171        exex_handle.finished_height = Some(BlockNumHash::new(15, B256::random()));
1172
1173        let mut block1: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
1174        block1.set_hash(B256::new([0x01; 32]));
1175        block1.set_block_number(10);
1176
1177        let notification = ExExNotification::ChainCommitted {
1178            new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
1179        };
1180
1181        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1182
1183        // Send the notification
1184        match exex_handle.send(&mut cx, &(22, notification)) {
1185            Poll::Ready(Ok(())) => {
1186                poll_fn(|cx| {
1187                    // The notification should be skipped, so nothing should be sent.
1188                    // Check that the receiver channel is indeed empty
1189                    assert!(notifications.poll_next_unpin(cx).is_pending());
1190                    Poll::Ready(())
1191                })
1192                .await;
1193            }
1194            Poll::Pending | Poll::Ready(Err(_)) => {
1195                panic!("Notification should not be pending or fail");
1196            }
1197        }
1198
1199        // Ensure the notification ID was still incremented
1200        assert_eq!(exex_handle.next_notification_id, 23);
1201    }
1202
1203    #[tokio::test]
1204    async fn test_sends_chain_reorged_notification() {
1205        let provider_factory = create_test_provider_factory();
1206        init_genesis(&provider_factory).unwrap();
1207        let provider = BlockchainProvider::new(provider_factory).unwrap();
1208
1209        let temp_dir = tempfile::tempdir().unwrap();
1210        let wal = Wal::new(temp_dir.path()).unwrap();
1211
1212        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1213            "test_exex".to_string(),
1214            Default::default(),
1215            provider,
1216            EthExecutorProvider::mainnet(),
1217            wal.handle(),
1218        );
1219
1220        let notification = ExExNotification::ChainReorged {
1221            old: Arc::new(Chain::default()),
1222            new: Arc::new(Chain::default()),
1223        };
1224
1225        // Even if the finished height is higher than the tip of the new chain, the reorg
1226        // notification should be received
1227        exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1228
1229        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1230
1231        // Send the notification
1232        match exex_handle.send(&mut cx, &(22, notification.clone())) {
1233            Poll::Ready(Ok(())) => {
1234                let received_notification = notifications.next().await.unwrap().unwrap();
1235                assert_eq!(received_notification, notification);
1236            }
1237            Poll::Pending | Poll::Ready(Err(_)) => {
1238                panic!("Notification should not be pending or fail")
1239            }
1240        }
1241
1242        // Ensure the notification ID was incremented
1243        assert_eq!(exex_handle.next_notification_id, 23);
1244    }
1245
1246    #[tokio::test]
1247    async fn test_sends_chain_reverted_notification() {
1248        let provider_factory = create_test_provider_factory();
1249        init_genesis(&provider_factory).unwrap();
1250        let provider = BlockchainProvider::new(provider_factory).unwrap();
1251
1252        let temp_dir = tempfile::tempdir().unwrap();
1253        let wal = Wal::new(temp_dir.path()).unwrap();
1254
1255        let (mut exex_handle, _, mut notifications) = ExExHandle::new(
1256            "test_exex".to_string(),
1257            Default::default(),
1258            provider,
1259            EthExecutorProvider::mainnet(),
1260            wal.handle(),
1261        );
1262
1263        let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };
1264
1265        // Even if the finished height is higher than the tip of the new chain, the reorg
1266        // notification should be received
1267        exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
1268
1269        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1270
1271        // Send the notification
1272        match exex_handle.send(&mut cx, &(22, notification.clone())) {
1273            Poll::Ready(Ok(())) => {
1274                let received_notification = notifications.next().await.unwrap().unwrap();
1275                assert_eq!(received_notification, notification);
1276            }
1277            Poll::Pending | Poll::Ready(Err(_)) => {
1278                panic!("Notification should not be pending or fail")
1279            }
1280        }
1281
1282        // Ensure the notification ID was incremented
1283        assert_eq!(exex_handle.next_notification_id, 23);
1284    }
1285
1286    #[tokio::test]
1287    async fn test_exex_wal() -> eyre::Result<()> {
1288        reth_tracing::init_test_tracing();
1289
1290        let mut rng = generators::rng();
1291
1292        let provider_factory = create_test_provider_factory();
1293        let genesis_hash = init_genesis(&provider_factory).unwrap();
1294        let genesis_block = provider_factory
1295            .sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash)
1296            .unwrap()
1297            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
1298
1299        let block = random_block(
1300            &mut rng,
1301            genesis_block.number + 1,
1302            BlockParams { parent: Some(genesis_hash), ..Default::default() },
1303        )
1304        .try_recover()
1305        .unwrap();
1306        let provider_rw = provider_factory.database_provider_rw().unwrap();
1307        provider_rw.insert_block(block.clone(), StorageLocation::Database).unwrap();
1308        provider_rw.commit().unwrap();
1309
1310        let provider = BlockchainProvider::new(provider_factory).unwrap();
1311
1312        let temp_dir = tempfile::tempdir().unwrap();
1313        let wal = Wal::new(temp_dir.path()).unwrap();
1314
1315        let (exex_handle, events_tx, mut notifications) = ExExHandle::new(
1316            "test_exex".to_string(),
1317            Default::default(),
1318            provider.clone(),
1319            EthExecutorProvider::mainnet(),
1320            wal.handle(),
1321        );
1322
1323        let genesis_notification = ExExNotification::ChainCommitted {
1324            new: Arc::new(Chain::new(vec![genesis_block.clone()], Default::default(), None)),
1325        };
1326        let notification = ExExNotification::ChainCommitted {
1327            new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
1328        };
1329
1330        let (finalized_headers_tx, rx) = watch::channel(None);
1331        finalized_headers_tx.send(Some(genesis_block.clone_sealed_header()))?;
1332        let finalized_header_stream = ForkChoiceStream::new(rx);
1333
1334        let mut exex_manager = std::pin::pin!(ExExManager::new(
1335            provider,
1336            vec![exex_handle],
1337            2,
1338            wal,
1339            finalized_header_stream
1340        ));
1341
1342        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
1343
1344        exex_manager
1345            .handle()
1346            .send(ExExNotificationSource::Pipeline, genesis_notification.clone())?;
1347        exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?;
1348
1349        assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1350        assert_eq!(
1351            notifications.try_poll_next_unpin(&mut cx)?,
1352            Poll::Ready(Some(genesis_notification))
1353        );
1354        assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
1355        assert_eq!(
1356            notifications.try_poll_next_unpin(&mut cx)?,
1357            Poll::Ready(Some(notification.clone()))
1358        );
1359        // WAL shouldn't contain the genesis notification, because it's finalized
1360        assert_eq!(
1361            exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1362            [notification.clone()]
1363        );
1364
1365        finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1366        assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1367        // WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event
1368        assert_eq!(
1369            exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1370            [notification.clone()]
1371        );
1372
1373        // Send a `FinishedHeight` event with a non-canonical block
1374        events_tx
1375            .send(ExExEvent::FinishedHeight((rng.random::<u64>(), rng.random::<B256>()).into()))
1376            .unwrap();
1377
1378        finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1379        assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1380        // WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a
1381        // non-canonical block
1382        assert_eq!(
1383            exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
1384            [notification]
1385        );
1386
1387        // Send a `FinishedHeight` event with a canonical block
1388        events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
1389
1390        finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
1391        assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
1392        // WAL is finalized
1393        assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None);
1394
1395        Ok(())
1396    }
1397}