reth_exex/
notifications.rs

1use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use futures::{Stream, StreamExt};
5use reth_evm::execute::BlockExecutorProvider;
6use reth_exex_types::ExExHead;
7use reth_node_api::NodePrimitives;
8use reth_primitives::EthPrimitives;
9use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
10use reth_tracing::tracing::debug;
11use std::{
12    fmt::Debug,
13    pin::Pin,
14    sync::Arc,
15    task::{ready, Context, Poll},
16};
17use tokio::sync::mpsc::Receiver;
18
19/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. If the
20/// stream is configured with a head via [`ExExNotifications::set_with_head`] or
21/// [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
22#[derive(Debug)]
23pub struct ExExNotifications<P, E>
24where
25    E: BlockExecutorProvider,
26{
27    inner: ExExNotificationsInner<P, E>,
28}
29
30/// A trait, that represents a stream of [`ExExNotification`]s. The stream will emit notifications
31/// for all blocks. If the stream is configured with a head via [`ExExNotifications::set_with_head`]
32/// or [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
33pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
34    Stream<Item = eyre::Result<ExExNotification<N>>> + Unpin
35{
36    /// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s without a head.
37    ///
38    /// It's a no-op if the stream has already been configured without a head.
39    ///
40    /// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
41    fn set_without_head(&mut self);
42
43    /// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s with the provided
44    /// head.
45    ///
46    /// It's a no-op if the stream has already been configured with a head.
47    ///
48    /// See the documentation of [`ExExNotificationsWithHead`] for more details.
49    fn set_with_head(&mut self, exex_head: ExExHead);
50
51    /// Returns a new [`ExExNotificationsStream`] without a head.
52    ///
53    /// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
54    fn without_head(self) -> Self
55    where
56        Self: Sized;
57
58    /// Returns a new [`ExExNotificationsStream`] with the provided head.
59    ///
60    /// See the documentation of [`ExExNotificationsWithHead`] for more details.
61    fn with_head(self, exex_head: ExExHead) -> Self
62    where
63        Self: Sized;
64}
65
66#[derive(Debug)]
67#[allow(clippy::large_enum_variant)]
68enum ExExNotificationsInner<P, E>
69where
70    E: BlockExecutorProvider,
71{
72    /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
73    WithoutHead(ExExNotificationsWithoutHead<P, E>),
74    /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that
75    /// are committed or reverted after the given head.
76    WithHead(ExExNotificationsWithHead<P, E>),
77    /// Internal state used when transitioning between [`ExExNotificationsInner::WithoutHead`] and
78    /// [`ExExNotificationsInner::WithHead`].
79    Invalid,
80}
81
82impl<P, E> ExExNotifications<P, E>
83where
84    E: BlockExecutorProvider,
85{
86    /// Creates a new stream of [`ExExNotifications`] without a head.
87    pub const fn new(
88        node_head: BlockNumHash,
89        provider: P,
90        executor: E,
91        notifications: Receiver<ExExNotification<E::Primitives>>,
92        wal_handle: WalHandle<E::Primitives>,
93    ) -> Self {
94        Self {
95            inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
96                node_head,
97                provider,
98                executor,
99                notifications,
100                wal_handle,
101            )),
102        }
103    }
104}
105
106impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
107where
108    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
109    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
110        + Clone
111        + Unpin
112        + 'static,
113{
114    fn set_without_head(&mut self) {
115        let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
116        self.inner = ExExNotificationsInner::WithoutHead(match current {
117            ExExNotificationsInner::WithoutHead(notifications) => notifications,
118            ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new(
119                notifications.initial_local_head,
120                notifications.provider,
121                notifications.executor,
122                notifications.notifications,
123                notifications.wal_handle,
124            ),
125            ExExNotificationsInner::Invalid => unreachable!(),
126        });
127    }
128
129    fn set_with_head(&mut self, exex_head: ExExHead) {
130        let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
131        self.inner = ExExNotificationsInner::WithHead(match current {
132            ExExNotificationsInner::WithoutHead(notifications) => {
133                notifications.with_head(exex_head)
134            }
135            ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithHead::new(
136                notifications.initial_local_head,
137                notifications.provider,
138                notifications.executor,
139                notifications.notifications,
140                notifications.wal_handle,
141                exex_head,
142            ),
143            ExExNotificationsInner::Invalid => unreachable!(),
144        });
145    }
146
147    fn without_head(mut self) -> Self {
148        self.set_without_head();
149        self
150    }
151
152    fn with_head(mut self, exex_head: ExExHead) -> Self {
153        self.set_with_head(exex_head);
154        self
155    }
156}
157
158impl<P, E> Stream for ExExNotifications<P, E>
159where
160    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
161    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
162        + Clone
163        + Unpin
164        + 'static,
165{
166    type Item = eyre::Result<ExExNotification<E::Primitives>>;
167
168    fn poll_next(
169        self: std::pin::Pin<&mut Self>,
170        cx: &mut std::task::Context<'_>,
171    ) -> std::task::Poll<Option<Self::Item>> {
172        match &mut self.get_mut().inner {
173            ExExNotificationsInner::WithoutHead(notifications) => {
174                notifications.poll_next_unpin(cx).map(|result| result.map(Ok))
175            }
176            ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx),
177            ExExNotificationsInner::Invalid => unreachable!(),
178        }
179    }
180}
181
182/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
183pub struct ExExNotificationsWithoutHead<P, E>
184where
185    E: BlockExecutorProvider,
186{
187    node_head: BlockNumHash,
188    provider: P,
189    executor: E,
190    notifications: Receiver<ExExNotification<E::Primitives>>,
191    wal_handle: WalHandle<E::Primitives>,
192}
193
194impl<P: Debug, E> Debug for ExExNotificationsWithoutHead<P, E>
195where
196    E: Debug + BlockExecutorProvider,
197{
198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199        f.debug_struct("ExExNotifications")
200            .field("provider", &self.provider)
201            .field("executor", &self.executor)
202            .field("notifications", &self.notifications)
203            .finish()
204    }
205}
206
207impl<P, E> ExExNotificationsWithoutHead<P, E>
208where
209    E: BlockExecutorProvider,
210{
211    /// Creates a new instance of [`ExExNotificationsWithoutHead`].
212    const fn new(
213        node_head: BlockNumHash,
214        provider: P,
215        executor: E,
216        notifications: Receiver<ExExNotification<E::Primitives>>,
217        wal_handle: WalHandle<E::Primitives>,
218    ) -> Self {
219        Self { node_head, provider, executor, notifications, wal_handle }
220    }
221
222    /// Subscribe to notifications with the given head.
223    fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
224        ExExNotificationsWithHead::new(
225            self.node_head,
226            self.provider,
227            self.executor,
228            self.notifications,
229            self.wal_handle,
230            head,
231        )
232    }
233}
234
235impl<P: Unpin, E> Stream for ExExNotificationsWithoutHead<P, E>
236where
237    E: Unpin + BlockExecutorProvider,
238{
239    type Item = ExExNotification<E::Primitives>;
240
241    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
242        self.get_mut().notifications.poll_recv(cx)
243    }
244}
245
246/// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are
247/// committed or reverted after the given head. The head is the ExEx's latest view of the host
248/// chain.
249///
250/// Notifications will be sent starting from the head, not inclusive. For example, if
251/// `exex_head.number == 10`, then the first notification will be with `block.number == 11`. An
252/// `exex_head.number` of 10 indicates that the ExEx has processed up to block 10, and is ready to
253/// process block 11.
254#[derive(Debug)]
255pub struct ExExNotificationsWithHead<P, E>
256where
257    E: BlockExecutorProvider,
258{
259    /// The node's local head at launch.
260    initial_local_head: BlockNumHash,
261    provider: P,
262    executor: E,
263    notifications: Receiver<ExExNotification<E::Primitives>>,
264    wal_handle: WalHandle<E::Primitives>,
265    /// The exex head at launch
266    initial_exex_head: ExExHead,
267
268    /// If true, then we need to check if the ExEx head is on the canonical chain and if not,
269    /// revert its head.
270    pending_check_canonical: bool,
271    /// If true, then we need to check if the ExEx head is behind the node head and if so, backfill
272    /// the missing blocks.
273    pending_check_backfill: bool,
274    /// The backfill job to run before consuming any notifications.
275    backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
276}
277
278impl<P, E> ExExNotificationsWithHead<P, E>
279where
280    E: BlockExecutorProvider,
281{
282    /// Creates a new [`ExExNotificationsWithHead`].
283    const fn new(
284        node_head: BlockNumHash,
285        provider: P,
286        executor: E,
287        notifications: Receiver<ExExNotification<E::Primitives>>,
288        wal_handle: WalHandle<E::Primitives>,
289        exex_head: ExExHead,
290    ) -> Self {
291        Self {
292            initial_local_head: node_head,
293            provider,
294            executor,
295            notifications,
296            wal_handle,
297            initial_exex_head: exex_head,
298            pending_check_canonical: true,
299            pending_check_backfill: true,
300            backfill_job: None,
301        }
302    }
303}
304
305impl<P, E> ExExNotificationsWithHead<P, E>
306where
307    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
308    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
309        + Clone
310        + Unpin
311        + 'static,
312{
313    /// Checks if the ExEx head is on the canonical chain.
314    ///
315    /// If the head block is not found in the database or it's ahead of the node head, it means
316    /// we're not on the canonical chain and we need to revert the notification with the ExEx
317    /// head block.
318    fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
319        if self.provider.is_known(&self.initial_exex_head.block.hash)? &&
320            self.initial_exex_head.block.number <= self.initial_local_head.number
321        {
322            // we have the targeted block and that block is below the current head
323            debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
324            return Ok(None)
325        }
326
327        // If the head block is not found in the database, it means we're not on the canonical
328        // chain.
329
330        // Get the committed notification for the head block from the WAL.
331        let Some(notification) = self
332            .wal_handle
333            .get_committed_notification_by_block_hash(&self.initial_exex_head.block.hash)?
334        else {
335            // it's possible that the exex head is further ahead
336            if self.initial_exex_head.block.number > self.initial_local_head.number {
337                debug!(target: "exex::notifications", "ExEx head is ahead of the canonical chain");
338                return Ok(None);
339            }
340
341            return Err(eyre::eyre!(
342                "Could not find notification for block hash {:?} in the WAL",
343                self.initial_exex_head.block.hash
344            ))
345        };
346
347        // Update the head block hash to the parent hash of the first committed block.
348        let committed_chain = notification.committed_chain().unwrap();
349        let new_exex_head =
350            (committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into();
351        debug!(target: "exex::notifications", old_exex_head = ?self.initial_exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
352        self.initial_exex_head.block = new_exex_head;
353
354        // Return an inverted notification. See the documentation for
355        // `ExExNotification::into_inverted`.
356        Ok(Some(notification.into_inverted()))
357    }
358
359    /// Compares the node head against the ExEx head, and backfills if needed.
360    ///
361    /// CAUTON: This method assumes that the ExEx head is <= the node head, and that it's on the
362    /// canonical chain.
363    ///
364    /// Possible situations are:
365    /// - ExEx is behind the node head (`node_head.number < exex_head.number`). Backfill from the
366    ///   node database.
367    /// - ExEx is at the same block number as the node head (`node_head.number ==
368    ///   exex_head.number`). Nothing to do.
369    fn check_backfill(&mut self) -> eyre::Result<()> {
370        let backfill_job_factory =
371            BackfillJobFactory::new(self.executor.clone(), self.provider.clone());
372        match self.initial_exex_head.block.number.cmp(&self.initial_local_head.number) {
373            std::cmp::Ordering::Less => {
374                // ExEx is behind the node head, start backfill
375                debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
376                let backfill = backfill_job_factory
377                    .backfill(
378                        self.initial_exex_head.block.number + 1..=self.initial_local_head.number,
379                    )
380                    .into_stream();
381                self.backfill_job = Some(backfill);
382            }
383            std::cmp::Ordering::Equal => {
384                debug!(target: "exex::notifications", "ExEx is at the node head");
385            }
386            std::cmp::Ordering::Greater => {
387                debug!(target: "exex::notifications", "ExEx is ahead of the node head");
388            }
389        };
390
391        Ok(())
392    }
393}
394
395impl<P, E> Stream for ExExNotificationsWithHead<P, E>
396where
397    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
398    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
399        + Clone
400        + Unpin
401        + 'static,
402{
403    type Item = eyre::Result<ExExNotification<E::Primitives>>;
404
405    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
406        let this = self.get_mut();
407
408        // 1. Check once whether we need to retrieve a notification gap from the WAL.
409        if this.pending_check_canonical {
410            if let Some(canonical_notification) = this.check_canonical()? {
411                return Poll::Ready(Some(Ok(canonical_notification)))
412            }
413
414            // ExEx head is on the canonical chain, we no longer need to check it
415            this.pending_check_canonical = false;
416        }
417
418        // 2. Check once whether we need to trigger backfill sync
419        if this.pending_check_backfill {
420            this.check_backfill()?;
421            this.pending_check_backfill = false;
422        }
423
424        // 3. If backfill is in progress yield new notifications
425        if let Some(backfill_job) = &mut this.backfill_job {
426            debug!(target: "exex::notifications", "Polling backfill job");
427            if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
428                debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
429                return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
430                    new: Arc::new(chain),
431                })))
432            }
433
434            // Backfill job is done, remove it
435            this.backfill_job = None;
436        }
437
438        // 4. Otherwise advance the regular event stream
439        loop {
440            let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
441                return Poll::Ready(None)
442            };
443
444            // 5. In case the exex is ahead of the new tip, we must skip it
445            if let Some(committed) = notification.committed_chain() {
446                // inclusive check because we should start with `exex.head + 1`
447                if this.initial_exex_head.block.number >= committed.tip().number() {
448                    continue
449                }
450            }
451
452            return Poll::Ready(Some(Ok(notification)))
453        }
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use crate::Wal;
461    use alloy_consensus::Header;
462    use alloy_eips::BlockNumHash;
463    use eyre::OptionExt;
464    use futures::StreamExt;
465    use reth_db_common::init::init_genesis;
466    use reth_evm_ethereum::execute::EthExecutorProvider;
467    use reth_primitives::Block;
468    use reth_primitives_traits::Block as _;
469    use reth_provider::{
470        providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockWriter,
471        Chain, DatabaseProviderFactory, StorageLocation,
472    };
473    use reth_testing_utils::generators::{self, random_block, BlockParams};
474    use tokio::sync::mpsc;
475
476    #[tokio::test]
477    async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
478        let mut rng = generators::rng();
479
480        let temp_dir = tempfile::tempdir().unwrap();
481        let wal = Wal::new(temp_dir.path()).unwrap();
482
483        let provider_factory = create_test_provider_factory();
484        let genesis_hash = init_genesis(&provider_factory)?;
485        let genesis_block = provider_factory
486            .block(genesis_hash.into())?
487            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
488
489        let provider = BlockchainProvider::new(provider_factory.clone())?;
490
491        let node_head_block = random_block(
492            &mut rng,
493            genesis_block.number + 1,
494            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
495        );
496        let provider_rw = provider_factory.provider_rw()?;
497        provider_rw
498            .insert_block(node_head_block.clone().try_recover()?, StorageLocation::Database)?;
499        provider_rw.commit()?;
500
501        let node_head = node_head_block.num_hash();
502        let exex_head =
503            ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
504
505        let notification = ExExNotification::ChainCommitted {
506            new: Arc::new(Chain::new(
507                vec![random_block(
508                    &mut rng,
509                    node_head.number + 1,
510                    BlockParams { parent: Some(node_head.hash), ..Default::default() },
511                )
512                .try_recover()?],
513                Default::default(),
514                None,
515            )),
516        };
517
518        let (notifications_tx, notifications_rx) = mpsc::channel(1);
519
520        notifications_tx.send(notification.clone()).await?;
521
522        let mut notifications = ExExNotificationsWithoutHead::new(
523            node_head,
524            provider,
525            EthExecutorProvider::mainnet(),
526            notifications_rx,
527            wal.handle(),
528        )
529        .with_head(exex_head);
530
531        // First notification is the backfill of missing blocks from the canonical chain
532        assert_eq!(
533            notifications.next().await.transpose()?,
534            Some(ExExNotification::ChainCommitted {
535                new: Arc::new(
536                    BackfillJobFactory::new(
537                        notifications.executor.clone(),
538                        notifications.provider.clone()
539                    )
540                    .backfill(1..=1)
541                    .next()
542                    .ok_or_eyre("failed to backfill")??
543                )
544            })
545        );
546
547        // Second notification is the actual notification that we sent before
548        assert_eq!(notifications.next().await.transpose()?, Some(notification));
549
550        Ok(())
551    }
552
553    #[tokio::test]
554    async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
555        let temp_dir = tempfile::tempdir().unwrap();
556        let wal = Wal::new(temp_dir.path()).unwrap();
557
558        let provider_factory = create_test_provider_factory();
559        let genesis_hash = init_genesis(&provider_factory)?;
560        let genesis_block = provider_factory
561            .block(genesis_hash.into())?
562            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
563
564        let provider = BlockchainProvider::new(provider_factory)?;
565
566        let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
567        let exex_head = ExExHead { block: node_head };
568
569        let notification = ExExNotification::ChainCommitted {
570            new: Arc::new(Chain::new(
571                vec![Block {
572                    header: Header {
573                        parent_hash: node_head.hash,
574                        number: node_head.number + 1,
575                        ..Default::default()
576                    },
577                    ..Default::default()
578                }
579                .seal_slow()
580                .try_recover()?],
581                Default::default(),
582                None,
583            )),
584        };
585
586        let (notifications_tx, notifications_rx) = mpsc::channel(1);
587
588        notifications_tx.send(notification.clone()).await?;
589
590        let mut notifications = ExExNotificationsWithoutHead::new(
591            node_head,
592            provider,
593            EthExecutorProvider::mainnet(),
594            notifications_rx,
595            wal.handle(),
596        )
597        .with_head(exex_head);
598
599        let new_notification = notifications.next().await.transpose()?;
600        assert_eq!(new_notification, Some(notification));
601
602        Ok(())
603    }
604
605    #[tokio::test]
606    async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
607        let mut rng = generators::rng();
608
609        let temp_dir = tempfile::tempdir().unwrap();
610        let wal = Wal::new(temp_dir.path()).unwrap();
611
612        let provider_factory = create_test_provider_factory();
613        let genesis_hash = init_genesis(&provider_factory)?;
614        let genesis_block = provider_factory
615            .block(genesis_hash.into())?
616            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
617
618        let provider = BlockchainProvider::new(provider_factory)?;
619
620        let node_head_block = random_block(
621            &mut rng,
622            genesis_block.number + 1,
623            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
624        )
625        .try_recover()?;
626        let node_head = node_head_block.num_hash();
627        let provider_rw = provider.database_provider_rw()?;
628        provider_rw.insert_block(node_head_block, StorageLocation::Database)?;
629        provider_rw.commit()?;
630        let node_head_notification = ExExNotification::ChainCommitted {
631            new: Arc::new(
632                BackfillJobFactory::new(EthExecutorProvider::mainnet(), provider.clone())
633                    .backfill(node_head.number..=node_head.number)
634                    .next()
635                    .ok_or_else(|| eyre::eyre!("failed to backfill"))??,
636            ),
637        };
638
639        let exex_head_block = random_block(
640            &mut rng,
641            genesis_block.number + 1,
642            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
643        );
644        let exex_head = ExExHead { block: exex_head_block.num_hash() };
645        let exex_head_notification = ExExNotification::ChainCommitted {
646            new: Arc::new(Chain::new(
647                vec![exex_head_block.clone().try_recover()?],
648                Default::default(),
649                None,
650            )),
651        };
652        wal.commit(&exex_head_notification)?;
653
654        let new_notification = ExExNotification::ChainCommitted {
655            new: Arc::new(Chain::new(
656                vec![random_block(
657                    &mut rng,
658                    node_head.number + 1,
659                    BlockParams { parent: Some(node_head.hash), ..Default::default() },
660                )
661                .try_recover()?],
662                Default::default(),
663                None,
664            )),
665        };
666
667        let (notifications_tx, notifications_rx) = mpsc::channel(1);
668
669        notifications_tx.send(new_notification.clone()).await?;
670
671        let mut notifications = ExExNotificationsWithoutHead::new(
672            node_head,
673            provider,
674            EthExecutorProvider::mainnet(),
675            notifications_rx,
676            wal.handle(),
677        )
678        .with_head(exex_head);
679
680        // First notification is the revert of the ExEx head block to get back to the canonical
681        // chain
682        assert_eq!(
683            notifications.next().await.transpose()?,
684            Some(exex_head_notification.into_inverted())
685        );
686        // Second notification is the backfilled block from the canonical chain to get back to the
687        // canonical tip
688        assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
689        // Third notification is the actual notification that we sent before
690        assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
691
692        Ok(())
693    }
694
695    #[tokio::test]
696    async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
697        reth_tracing::init_test_tracing();
698        let mut rng = generators::rng();
699
700        let temp_dir = tempfile::tempdir().unwrap();
701        let wal = Wal::new(temp_dir.path()).unwrap();
702
703        let provider_factory = create_test_provider_factory();
704        let genesis_hash = init_genesis(&provider_factory)?;
705        let genesis_block = provider_factory
706            .block(genesis_hash.into())?
707            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
708
709        let provider = BlockchainProvider::new(provider_factory)?;
710
711        let exex_head_block = random_block(
712            &mut rng,
713            genesis_block.number + 1,
714            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
715        );
716        let exex_head_notification = ExExNotification::ChainCommitted {
717            new: Arc::new(Chain::new(
718                vec![exex_head_block.clone().try_recover()?],
719                Default::default(),
720                None,
721            )),
722        };
723        wal.commit(&exex_head_notification)?;
724
725        let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
726        let exex_head = ExExHead {
727            block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
728        };
729
730        let new_notification = ExExNotification::ChainCommitted {
731            new: Arc::new(Chain::new(
732                vec![random_block(
733                    &mut rng,
734                    genesis_block.number + 1,
735                    BlockParams { parent: Some(genesis_hash), ..Default::default() },
736                )
737                .try_recover()?],
738                Default::default(),
739                None,
740            )),
741        };
742
743        let (notifications_tx, notifications_rx) = mpsc::channel(1);
744
745        notifications_tx.send(new_notification.clone()).await?;
746
747        let mut notifications = ExExNotificationsWithoutHead::new(
748            node_head,
749            provider,
750            EthExecutorProvider::mainnet(),
751            notifications_rx,
752            wal.handle(),
753        )
754        .with_head(exex_head);
755
756        // First notification is the revert of the ExEx head block to get back to the canonical
757        // chain
758        assert_eq!(
759            notifications.next().await.transpose()?,
760            Some(exex_head_notification.into_inverted())
761        );
762
763        // Second notification is the actual notification that we sent before
764        assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
765
766        Ok(())
767    }
768}