reth_exex/
notifications.rs

1use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use futures::{Stream, StreamExt};
5use reth_ethereum_primitives::EthPrimitives;
6use reth_evm::ConfigureEvm;
7use reth_exex_types::ExExHead;
8use reth_node_api::NodePrimitives;
9use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
10use reth_tracing::tracing::debug;
11use std::{
12    fmt::Debug,
13    pin::Pin,
14    sync::Arc,
15    task::{ready, Context, Poll},
16};
17use tokio::sync::mpsc::Receiver;
18
19/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. If the
20/// stream is configured with a head via [`ExExNotifications::set_with_head`] or
21/// [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
22#[derive(Debug)]
23pub struct ExExNotifications<P, E>
24where
25    E: ConfigureEvm,
26{
27    inner: ExExNotificationsInner<P, E>,
28}
29
30/// A trait, that represents a stream of [`ExExNotification`]s. The stream will emit notifications
31/// for all blocks. If the stream is configured with a head via [`ExExNotifications::set_with_head`]
32/// or [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
33pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
34    Stream<Item = eyre::Result<ExExNotification<N>>> + Unpin
35{
36    /// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s without a head.
37    ///
38    /// It's a no-op if the stream has already been configured without a head.
39    ///
40    /// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
41    fn set_without_head(&mut self);
42
43    /// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s with the provided
44    /// head.
45    ///
46    /// It's a no-op if the stream has already been configured with a head.
47    ///
48    /// See the documentation of [`ExExNotificationsWithHead`] for more details.
49    fn set_with_head(&mut self, exex_head: ExExHead);
50
51    /// Returns a new [`ExExNotificationsStream`] without a head.
52    ///
53    /// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
54    fn without_head(self) -> Self
55    where
56        Self: Sized;
57
58    /// Returns a new [`ExExNotificationsStream`] with the provided head.
59    ///
60    /// See the documentation of [`ExExNotificationsWithHead`] for more details.
61    fn with_head(self, exex_head: ExExHead) -> Self
62    where
63        Self: Sized;
64}
65
66#[derive(Debug)]
67enum ExExNotificationsInner<P, E>
68where
69    E: ConfigureEvm,
70{
71    /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
72    WithoutHead(ExExNotificationsWithoutHead<P, E>),
73    /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that
74    /// are committed or reverted after the given head.
75    WithHead(Box<ExExNotificationsWithHead<P, E>>),
76    /// Internal state used when transitioning between [`ExExNotificationsInner::WithoutHead`] and
77    /// [`ExExNotificationsInner::WithHead`].
78    Invalid,
79}
80
81impl<P, E> ExExNotifications<P, E>
82where
83    E: ConfigureEvm,
84{
85    /// Creates a new stream of [`ExExNotifications`] without a head.
86    pub const fn new(
87        node_head: BlockNumHash,
88        provider: P,
89        evm_config: E,
90        notifications: Receiver<ExExNotification<E::Primitives>>,
91        wal_handle: WalHandle<E::Primitives>,
92    ) -> Self {
93        Self {
94            inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
95                node_head,
96                provider,
97                evm_config,
98                notifications,
99                wal_handle,
100            )),
101        }
102    }
103}
104
105impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
106where
107    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
108    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
109{
110    fn set_without_head(&mut self) {
111        let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
112        self.inner = ExExNotificationsInner::WithoutHead(match current {
113            ExExNotificationsInner::WithoutHead(notifications) => notifications,
114            ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new(
115                notifications.initial_local_head,
116                notifications.provider,
117                notifications.evm_config,
118                notifications.notifications,
119                notifications.wal_handle,
120            ),
121            ExExNotificationsInner::Invalid => unreachable!(),
122        });
123    }
124
125    fn set_with_head(&mut self, exex_head: ExExHead) {
126        let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
127        self.inner = ExExNotificationsInner::WithHead(match current {
128            ExExNotificationsInner::WithoutHead(notifications) => {
129                Box::new(notifications.with_head(exex_head))
130            }
131            ExExNotificationsInner::WithHead(notifications) => {
132                Box::new(ExExNotificationsWithHead::new(
133                    notifications.initial_local_head,
134                    notifications.provider,
135                    notifications.evm_config,
136                    notifications.notifications,
137                    notifications.wal_handle,
138                    exex_head,
139                ))
140            }
141            ExExNotificationsInner::Invalid => unreachable!(),
142        });
143    }
144
145    fn without_head(mut self) -> Self {
146        self.set_without_head();
147        self
148    }
149
150    fn with_head(mut self, exex_head: ExExHead) -> Self {
151        self.set_with_head(exex_head);
152        self
153    }
154}
155
156impl<P, E> Stream for ExExNotifications<P, E>
157where
158    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
159    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
160{
161    type Item = eyre::Result<ExExNotification<E::Primitives>>;
162
163    fn poll_next(
164        self: std::pin::Pin<&mut Self>,
165        cx: &mut std::task::Context<'_>,
166    ) -> std::task::Poll<Option<Self::Item>> {
167        match &mut self.get_mut().inner {
168            ExExNotificationsInner::WithoutHead(notifications) => {
169                notifications.poll_next_unpin(cx).map(|result| result.map(Ok))
170            }
171            ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx),
172            ExExNotificationsInner::Invalid => unreachable!(),
173        }
174    }
175}
176
177/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
178pub struct ExExNotificationsWithoutHead<P, E>
179where
180    E: ConfigureEvm,
181{
182    node_head: BlockNumHash,
183    provider: P,
184    evm_config: E,
185    notifications: Receiver<ExExNotification<E::Primitives>>,
186    wal_handle: WalHandle<E::Primitives>,
187}
188
189impl<P: Debug, E> Debug for ExExNotificationsWithoutHead<P, E>
190where
191    E: ConfigureEvm + Debug,
192{
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        f.debug_struct("ExExNotifications")
195            .field("provider", &self.provider)
196            .field("evm_config", &self.evm_config)
197            .field("notifications", &self.notifications)
198            .finish()
199    }
200}
201
202impl<P, E> ExExNotificationsWithoutHead<P, E>
203where
204    E: ConfigureEvm,
205{
206    /// Creates a new instance of [`ExExNotificationsWithoutHead`].
207    const fn new(
208        node_head: BlockNumHash,
209        provider: P,
210        evm_config: E,
211        notifications: Receiver<ExExNotification<E::Primitives>>,
212        wal_handle: WalHandle<E::Primitives>,
213    ) -> Self {
214        Self { node_head, provider, evm_config, notifications, wal_handle }
215    }
216
217    /// Subscribe to notifications with the given head.
218    fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
219        ExExNotificationsWithHead::new(
220            self.node_head,
221            self.provider,
222            self.evm_config,
223            self.notifications,
224            self.wal_handle,
225            head,
226        )
227    }
228}
229
230impl<P: Unpin, E> Stream for ExExNotificationsWithoutHead<P, E>
231where
232    E: ConfigureEvm,
233{
234    type Item = ExExNotification<E::Primitives>;
235
236    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
237        self.get_mut().notifications.poll_recv(cx)
238    }
239}
240
241/// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are
242/// committed or reverted after the given head. The head is the ExEx's latest view of the host
243/// chain.
244///
245/// Notifications will be sent starting from the head, not inclusive. For example, if
246/// `exex_head.number == 10`, then the first notification will be with `block.number == 11`. An
247/// `exex_head.number` of 10 indicates that the ExEx has processed up to block 10, and is ready to
248/// process block 11.
249#[derive(Debug)]
250pub struct ExExNotificationsWithHead<P, E>
251where
252    E: ConfigureEvm,
253{
254    /// The node's local head at launch.
255    initial_local_head: BlockNumHash,
256    provider: P,
257    evm_config: E,
258    notifications: Receiver<ExExNotification<E::Primitives>>,
259    wal_handle: WalHandle<E::Primitives>,
260    /// The exex head at launch
261    initial_exex_head: ExExHead,
262
263    /// If true, then we need to check if the ExEx head is on the canonical chain and if not,
264    /// revert its head.
265    pending_check_canonical: bool,
266    /// If true, then we need to check if the ExEx head is behind the node head and if so, backfill
267    /// the missing blocks.
268    pending_check_backfill: bool,
269    /// The backfill job to run before consuming any notifications.
270    backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
271}
272
273impl<P, E> ExExNotificationsWithHead<P, E>
274where
275    E: ConfigureEvm,
276{
277    /// Creates a new [`ExExNotificationsWithHead`].
278    const fn new(
279        node_head: BlockNumHash,
280        provider: P,
281        evm_config: E,
282        notifications: Receiver<ExExNotification<E::Primitives>>,
283        wal_handle: WalHandle<E::Primitives>,
284        exex_head: ExExHead,
285    ) -> Self {
286        Self {
287            initial_local_head: node_head,
288            provider,
289            evm_config,
290            notifications,
291            wal_handle,
292            initial_exex_head: exex_head,
293            pending_check_canonical: true,
294            pending_check_backfill: true,
295            backfill_job: None,
296        }
297    }
298}
299
300impl<P, E> ExExNotificationsWithHead<P, E>
301where
302    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
303    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
304{
305    /// Checks if the ExEx head is on the canonical chain.
306    ///
307    /// If the head block is not found in the database or it's ahead of the node head, it means
308    /// we're not on the canonical chain and we need to revert the notification with the ExEx
309    /// head block.
310    fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
311        if self.provider.is_known(self.initial_exex_head.block.hash)? &&
312            self.initial_exex_head.block.number <= self.initial_local_head.number
313        {
314            // we have the targeted block and that block is below the current head
315            debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
316            return Ok(None)
317        }
318
319        // If the head block is not found in the database, it means we're not on the canonical
320        // chain.
321
322        // Get the committed notification for the head block from the WAL.
323        let Some(notification) = self
324            .wal_handle
325            .get_committed_notification_by_block_hash(&self.initial_exex_head.block.hash)?
326        else {
327            // it's possible that the exex head is further ahead
328            if self.initial_exex_head.block.number > self.initial_local_head.number {
329                debug!(target: "exex::notifications", "ExEx head is ahead of the canonical chain");
330                return Ok(None);
331            }
332
333            return Err(eyre::eyre!(
334                "Could not find notification for block hash {:?} in the WAL",
335                self.initial_exex_head.block.hash
336            ))
337        };
338
339        // Update the head block hash to the parent hash of the first committed block.
340        let committed_chain = notification.committed_chain().unwrap();
341        let new_exex_head =
342            (committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into();
343        debug!(target: "exex::notifications", old_exex_head = ?self.initial_exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
344        self.initial_exex_head.block = new_exex_head;
345
346        // Return an inverted notification. See the documentation for
347        // `ExExNotification::into_inverted`.
348        Ok(Some(notification.into_inverted()))
349    }
350
351    /// Compares the node head against the ExEx head, and backfills if needed.
352    ///
353    /// CAUTION: This method assumes that the ExEx head is <= the node head, and that it's on the
354    /// canonical chain.
355    ///
356    /// Possible situations are:
357    /// - ExEx is behind the node head (`node_head.number < exex_head.number`). Backfill from the
358    ///   node database.
359    /// - ExEx is at the same block number as the node head (`node_head.number ==
360    ///   exex_head.number`). Nothing to do.
361    fn check_backfill(&mut self) -> eyre::Result<()> {
362        let backfill_job_factory =
363            BackfillJobFactory::new(self.evm_config.clone(), self.provider.clone());
364        match self.initial_exex_head.block.number.cmp(&self.initial_local_head.number) {
365            std::cmp::Ordering::Less => {
366                // ExEx is behind the node head, start backfill
367                debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
368                let backfill = backfill_job_factory
369                    .backfill(
370                        self.initial_exex_head.block.number + 1..=self.initial_local_head.number,
371                    )
372                    .into_stream();
373                self.backfill_job = Some(backfill);
374            }
375            std::cmp::Ordering::Equal => {
376                debug!(target: "exex::notifications", "ExEx is at the node head");
377            }
378            std::cmp::Ordering::Greater => {
379                debug!(target: "exex::notifications", "ExEx is ahead of the node head");
380            }
381        };
382
383        Ok(())
384    }
385}
386
387impl<P, E> Stream for ExExNotificationsWithHead<P, E>
388where
389    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
390    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
391{
392    type Item = eyre::Result<ExExNotification<E::Primitives>>;
393
394    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
395        let this = self.get_mut();
396
397        // 1. Check once whether we need to retrieve a notification gap from the WAL.
398        if this.pending_check_canonical {
399            if let Some(canonical_notification) = this.check_canonical()? {
400                return Poll::Ready(Some(Ok(canonical_notification)))
401            }
402
403            // ExEx head is on the canonical chain, we no longer need to check it
404            this.pending_check_canonical = false;
405        }
406
407        // 2. Check once whether we need to trigger backfill sync
408        if this.pending_check_backfill {
409            this.check_backfill()?;
410            this.pending_check_backfill = false;
411        }
412
413        // 3. If backfill is in progress yield new notifications
414        if let Some(backfill_job) = &mut this.backfill_job {
415            debug!(target: "exex::notifications", "Polling backfill job");
416            if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
417                debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
418                return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
419                    new: Arc::new(chain),
420                })))
421            }
422
423            // Backfill job is done, remove it
424            this.backfill_job = None;
425        }
426
427        // 4. Otherwise advance the regular event stream
428        loop {
429            let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
430                return Poll::Ready(None)
431            };
432
433            // 5. In case the exex is ahead of the new tip, we must skip it
434            if let Some(committed) = notification.committed_chain() {
435                // inclusive check because we should start with `exex.head + 1`
436                if this.initial_exex_head.block.number >= committed.tip().number() {
437                    continue
438                }
439            }
440
441            return Poll::Ready(Some(Ok(notification)))
442        }
443    }
444}
445
446#[cfg(test)]
447mod tests {
448    use super::*;
449    use crate::Wal;
450    use alloy_consensus::Header;
451    use alloy_eips::BlockNumHash;
452    use eyre::OptionExt;
453    use futures::StreamExt;
454    use reth_db_common::init::init_genesis;
455    use reth_ethereum_primitives::Block;
456    use reth_evm_ethereum::EthEvmConfig;
457    use reth_primitives_traits::Block as _;
458    use reth_provider::{
459        providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockWriter,
460        Chain, DBProvider, DatabaseProviderFactory,
461    };
462    use reth_testing_utils::generators::{self, random_block, BlockParams};
463    use tokio::sync::mpsc;
464
465    #[tokio::test]
466    async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
467        let mut rng = generators::rng();
468
469        let temp_dir = tempfile::tempdir().unwrap();
470        let wal = Wal::new(temp_dir.path()).unwrap();
471
472        let provider_factory = create_test_provider_factory();
473        let genesis_hash = init_genesis(&provider_factory)?;
474        let genesis_block = provider_factory
475            .block(genesis_hash.into())?
476            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
477
478        let provider = BlockchainProvider::new(provider_factory.clone())?;
479
480        let node_head_block = random_block(
481            &mut rng,
482            genesis_block.number + 1,
483            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
484        );
485        let provider_rw = provider_factory.provider_rw()?;
486        provider_rw.insert_block(node_head_block.clone().try_recover()?)?;
487        provider_rw.commit()?;
488
489        let node_head = node_head_block.num_hash();
490        let exex_head =
491            ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
492
493        let notification = ExExNotification::ChainCommitted {
494            new: Arc::new(Chain::new(
495                vec![random_block(
496                    &mut rng,
497                    node_head.number + 1,
498                    BlockParams { parent: Some(node_head.hash), ..Default::default() },
499                )
500                .try_recover()?],
501                Default::default(),
502                None,
503            )),
504        };
505
506        let (notifications_tx, notifications_rx) = mpsc::channel(1);
507
508        notifications_tx.send(notification.clone()).await?;
509
510        let mut notifications = ExExNotificationsWithoutHead::new(
511            node_head,
512            provider,
513            EthEvmConfig::mainnet(),
514            notifications_rx,
515            wal.handle(),
516        )
517        .with_head(exex_head);
518
519        // First notification is the backfill of missing blocks from the canonical chain
520        assert_eq!(
521            notifications.next().await.transpose()?,
522            Some(ExExNotification::ChainCommitted {
523                new: Arc::new(
524                    BackfillJobFactory::new(
525                        notifications.evm_config.clone(),
526                        notifications.provider.clone()
527                    )
528                    .backfill(1..=1)
529                    .next()
530                    .ok_or_eyre("failed to backfill")??
531                )
532            })
533        );
534
535        // Second notification is the actual notification that we sent before
536        assert_eq!(notifications.next().await.transpose()?, Some(notification));
537
538        Ok(())
539    }
540
541    #[tokio::test]
542    async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
543        let temp_dir = tempfile::tempdir().unwrap();
544        let wal = Wal::new(temp_dir.path()).unwrap();
545
546        let provider_factory = create_test_provider_factory();
547        let genesis_hash = init_genesis(&provider_factory)?;
548        let genesis_block = provider_factory
549            .block(genesis_hash.into())?
550            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
551
552        let provider = BlockchainProvider::new(provider_factory)?;
553
554        let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
555        let exex_head = ExExHead { block: node_head };
556
557        let notification = ExExNotification::ChainCommitted {
558            new: Arc::new(Chain::new(
559                vec![Block {
560                    header: Header {
561                        parent_hash: node_head.hash,
562                        number: node_head.number + 1,
563                        ..Default::default()
564                    },
565                    ..Default::default()
566                }
567                .seal_slow()
568                .try_recover()?],
569                Default::default(),
570                None,
571            )),
572        };
573
574        let (notifications_tx, notifications_rx) = mpsc::channel(1);
575
576        notifications_tx.send(notification.clone()).await?;
577
578        let mut notifications = ExExNotificationsWithoutHead::new(
579            node_head,
580            provider,
581            EthEvmConfig::mainnet(),
582            notifications_rx,
583            wal.handle(),
584        )
585        .with_head(exex_head);
586
587        let new_notification = notifications.next().await.transpose()?;
588        assert_eq!(new_notification, Some(notification));
589
590        Ok(())
591    }
592
593    #[tokio::test]
594    async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
595        let mut rng = generators::rng();
596
597        let temp_dir = tempfile::tempdir().unwrap();
598        let wal = Wal::new(temp_dir.path()).unwrap();
599
600        let provider_factory = create_test_provider_factory();
601        let genesis_hash = init_genesis(&provider_factory)?;
602        let genesis_block = provider_factory
603            .block(genesis_hash.into())?
604            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
605
606        let provider = BlockchainProvider::new(provider_factory)?;
607
608        let node_head_block = random_block(
609            &mut rng,
610            genesis_block.number + 1,
611            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
612        )
613        .try_recover()?;
614        let node_head = node_head_block.num_hash();
615        let provider_rw = provider.database_provider_rw()?;
616        provider_rw.insert_block(node_head_block)?;
617        provider_rw.commit()?;
618        let node_head_notification = ExExNotification::ChainCommitted {
619            new: Arc::new(
620                BackfillJobFactory::new(EthEvmConfig::mainnet(), provider.clone())
621                    .backfill(node_head.number..=node_head.number)
622                    .next()
623                    .ok_or_else(|| eyre::eyre!("failed to backfill"))??,
624            ),
625        };
626
627        let exex_head_block = random_block(
628            &mut rng,
629            genesis_block.number + 1,
630            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
631        );
632        let exex_head = ExExHead { block: exex_head_block.num_hash() };
633        let exex_head_notification = ExExNotification::ChainCommitted {
634            new: Arc::new(Chain::new(
635                vec![exex_head_block.clone().try_recover()?],
636                Default::default(),
637                None,
638            )),
639        };
640        wal.commit(&exex_head_notification)?;
641
642        let new_notification = ExExNotification::ChainCommitted {
643            new: Arc::new(Chain::new(
644                vec![random_block(
645                    &mut rng,
646                    node_head.number + 1,
647                    BlockParams { parent: Some(node_head.hash), ..Default::default() },
648                )
649                .try_recover()?],
650                Default::default(),
651                None,
652            )),
653        };
654
655        let (notifications_tx, notifications_rx) = mpsc::channel(1);
656
657        notifications_tx.send(new_notification.clone()).await?;
658
659        let mut notifications = ExExNotificationsWithoutHead::new(
660            node_head,
661            provider,
662            EthEvmConfig::mainnet(),
663            notifications_rx,
664            wal.handle(),
665        )
666        .with_head(exex_head);
667
668        // First notification is the revert of the ExEx head block to get back to the canonical
669        // chain
670        assert_eq!(
671            notifications.next().await.transpose()?,
672            Some(exex_head_notification.into_inverted())
673        );
674        // Second notification is the backfilled block from the canonical chain to get back to the
675        // canonical tip
676        assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
677        // Third notification is the actual notification that we sent before
678        assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
679
680        Ok(())
681    }
682
683    #[tokio::test]
684    async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
685        reth_tracing::init_test_tracing();
686        let mut rng = generators::rng();
687
688        let temp_dir = tempfile::tempdir().unwrap();
689        let wal = Wal::new(temp_dir.path()).unwrap();
690
691        let provider_factory = create_test_provider_factory();
692        let genesis_hash = init_genesis(&provider_factory)?;
693        let genesis_block = provider_factory
694            .block(genesis_hash.into())?
695            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
696
697        let provider = BlockchainProvider::new(provider_factory)?;
698
699        let exex_head_block = random_block(
700            &mut rng,
701            genesis_block.number + 1,
702            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
703        );
704        let exex_head_notification = ExExNotification::ChainCommitted {
705            new: Arc::new(Chain::new(
706                vec![exex_head_block.clone().try_recover()?],
707                Default::default(),
708                None,
709            )),
710        };
711        wal.commit(&exex_head_notification)?;
712
713        let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
714        let exex_head = ExExHead {
715            block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
716        };
717
718        let new_notification = ExExNotification::ChainCommitted {
719            new: Arc::new(Chain::new(
720                vec![random_block(
721                    &mut rng,
722                    genesis_block.number + 1,
723                    BlockParams { parent: Some(genesis_hash), ..Default::default() },
724                )
725                .try_recover()?],
726                Default::default(),
727                None,
728            )),
729        };
730
731        let (notifications_tx, notifications_rx) = mpsc::channel(1);
732
733        notifications_tx.send(new_notification.clone()).await?;
734
735        let mut notifications = ExExNotificationsWithoutHead::new(
736            node_head,
737            provider,
738            EthEvmConfig::mainnet(),
739            notifications_rx,
740            wal.handle(),
741        )
742        .with_head(exex_head);
743
744        // First notification is the revert of the ExEx head block to get back to the canonical
745        // chain
746        assert_eq!(
747            notifications.next().await.transpose()?,
748            Some(exex_head_notification.into_inverted())
749        );
750
751        // Second notification is the actual notification that we sent before
752        assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
753
754        Ok(())
755    }
756}