reth_exex/
notifications.rs

1use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use futures::{Stream, StreamExt};
5use reth_ethereum_primitives::EthPrimitives;
6use reth_evm::execute::BlockExecutorProvider;
7use reth_exex_types::ExExHead;
8use reth_node_api::NodePrimitives;
9use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
10use reth_tracing::tracing::debug;
11use std::{
12    fmt::Debug,
13    pin::Pin,
14    sync::Arc,
15    task::{ready, Context, Poll},
16};
17use tokio::sync::mpsc::Receiver;
18
19/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. If the
20/// stream is configured with a head via [`ExExNotifications::set_with_head`] or
21/// [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
22#[derive(Debug)]
23pub struct ExExNotifications<P, E>
24where
25    E: BlockExecutorProvider,
26{
27    inner: ExExNotificationsInner<P, E>,
28}
29
30/// A trait, that represents a stream of [`ExExNotification`]s. The stream will emit notifications
31/// for all blocks. If the stream is configured with a head via [`ExExNotifications::set_with_head`]
32/// or [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
33pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
34    Stream<Item = eyre::Result<ExExNotification<N>>> + Unpin
35{
36    /// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s without a head.
37    ///
38    /// It's a no-op if the stream has already been configured without a head.
39    ///
40    /// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
41    fn set_without_head(&mut self);
42
43    /// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s with the provided
44    /// head.
45    ///
46    /// It's a no-op if the stream has already been configured with a head.
47    ///
48    /// See the documentation of [`ExExNotificationsWithHead`] for more details.
49    fn set_with_head(&mut self, exex_head: ExExHead);
50
51    /// Returns a new [`ExExNotificationsStream`] without a head.
52    ///
53    /// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
54    fn without_head(self) -> Self
55    where
56        Self: Sized;
57
58    /// Returns a new [`ExExNotificationsStream`] with the provided head.
59    ///
60    /// See the documentation of [`ExExNotificationsWithHead`] for more details.
61    fn with_head(self, exex_head: ExExHead) -> Self
62    where
63        Self: Sized;
64}
65
66#[derive(Debug)]
67enum ExExNotificationsInner<P, E>
68where
69    E: BlockExecutorProvider,
70{
71    /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
72    WithoutHead(ExExNotificationsWithoutHead<P, E>),
73    /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that
74    /// are committed or reverted after the given head.
75    WithHead(Box<ExExNotificationsWithHead<P, E>>),
76    /// Internal state used when transitioning between [`ExExNotificationsInner::WithoutHead`] and
77    /// [`ExExNotificationsInner::WithHead`].
78    Invalid,
79}
80
81impl<P, E> ExExNotifications<P, E>
82where
83    E: BlockExecutorProvider,
84{
85    /// Creates a new stream of [`ExExNotifications`] without a head.
86    pub const fn new(
87        node_head: BlockNumHash,
88        provider: P,
89        executor: E,
90        notifications: Receiver<ExExNotification<E::Primitives>>,
91        wal_handle: WalHandle<E::Primitives>,
92    ) -> Self {
93        Self {
94            inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
95                node_head,
96                provider,
97                executor,
98                notifications,
99                wal_handle,
100            )),
101        }
102    }
103}
104
105impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
106where
107    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
108    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
109        + Clone
110        + Unpin
111        + 'static,
112{
113    fn set_without_head(&mut self) {
114        let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
115        self.inner = ExExNotificationsInner::WithoutHead(match current {
116            ExExNotificationsInner::WithoutHead(notifications) => notifications,
117            ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new(
118                notifications.initial_local_head,
119                notifications.provider,
120                notifications.executor,
121                notifications.notifications,
122                notifications.wal_handle,
123            ),
124            ExExNotificationsInner::Invalid => unreachable!(),
125        });
126    }
127
128    fn set_with_head(&mut self, exex_head: ExExHead) {
129        let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
130        self.inner = ExExNotificationsInner::WithHead(match current {
131            ExExNotificationsInner::WithoutHead(notifications) => {
132                Box::new(notifications.with_head(exex_head))
133            }
134            ExExNotificationsInner::WithHead(notifications) => {
135                Box::new(ExExNotificationsWithHead::new(
136                    notifications.initial_local_head,
137                    notifications.provider,
138                    notifications.executor,
139                    notifications.notifications,
140                    notifications.wal_handle,
141                    exex_head,
142                ))
143            }
144            ExExNotificationsInner::Invalid => unreachable!(),
145        });
146    }
147
148    fn without_head(mut self) -> Self {
149        self.set_without_head();
150        self
151    }
152
153    fn with_head(mut self, exex_head: ExExHead) -> Self {
154        self.set_with_head(exex_head);
155        self
156    }
157}
158
159impl<P, E> Stream for ExExNotifications<P, E>
160where
161    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
162    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
163        + Clone
164        + Unpin
165        + 'static,
166{
167    type Item = eyre::Result<ExExNotification<E::Primitives>>;
168
169    fn poll_next(
170        self: std::pin::Pin<&mut Self>,
171        cx: &mut std::task::Context<'_>,
172    ) -> std::task::Poll<Option<Self::Item>> {
173        match &mut self.get_mut().inner {
174            ExExNotificationsInner::WithoutHead(notifications) => {
175                notifications.poll_next_unpin(cx).map(|result| result.map(Ok))
176            }
177            ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx),
178            ExExNotificationsInner::Invalid => unreachable!(),
179        }
180    }
181}
182
183/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
184pub struct ExExNotificationsWithoutHead<P, E>
185where
186    E: BlockExecutorProvider,
187{
188    node_head: BlockNumHash,
189    provider: P,
190    executor: E,
191    notifications: Receiver<ExExNotification<E::Primitives>>,
192    wal_handle: WalHandle<E::Primitives>,
193}
194
195impl<P: Debug, E> Debug for ExExNotificationsWithoutHead<P, E>
196where
197    E: Debug + BlockExecutorProvider,
198{
199    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200        f.debug_struct("ExExNotifications")
201            .field("provider", &self.provider)
202            .field("executor", &self.executor)
203            .field("notifications", &self.notifications)
204            .finish()
205    }
206}
207
208impl<P, E> ExExNotificationsWithoutHead<P, E>
209where
210    E: BlockExecutorProvider,
211{
212    /// Creates a new instance of [`ExExNotificationsWithoutHead`].
213    const fn new(
214        node_head: BlockNumHash,
215        provider: P,
216        executor: E,
217        notifications: Receiver<ExExNotification<E::Primitives>>,
218        wal_handle: WalHandle<E::Primitives>,
219    ) -> Self {
220        Self { node_head, provider, executor, notifications, wal_handle }
221    }
222
223    /// Subscribe to notifications with the given head.
224    fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
225        ExExNotificationsWithHead::new(
226            self.node_head,
227            self.provider,
228            self.executor,
229            self.notifications,
230            self.wal_handle,
231            head,
232        )
233    }
234}
235
236impl<P: Unpin, E> Stream for ExExNotificationsWithoutHead<P, E>
237where
238    E: Unpin + BlockExecutorProvider,
239{
240    type Item = ExExNotification<E::Primitives>;
241
242    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
243        self.get_mut().notifications.poll_recv(cx)
244    }
245}
246
247/// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are
248/// committed or reverted after the given head. The head is the ExEx's latest view of the host
249/// chain.
250///
251/// Notifications will be sent starting from the head, not inclusive. For example, if
252/// `exex_head.number == 10`, then the first notification will be with `block.number == 11`. An
253/// `exex_head.number` of 10 indicates that the ExEx has processed up to block 10, and is ready to
254/// process block 11.
255#[derive(Debug)]
256pub struct ExExNotificationsWithHead<P, E>
257where
258    E: BlockExecutorProvider,
259{
260    /// The node's local head at launch.
261    initial_local_head: BlockNumHash,
262    provider: P,
263    executor: E,
264    notifications: Receiver<ExExNotification<E::Primitives>>,
265    wal_handle: WalHandle<E::Primitives>,
266    /// The exex head at launch
267    initial_exex_head: ExExHead,
268
269    /// If true, then we need to check if the ExEx head is on the canonical chain and if not,
270    /// revert its head.
271    pending_check_canonical: bool,
272    /// If true, then we need to check if the ExEx head is behind the node head and if so, backfill
273    /// the missing blocks.
274    pending_check_backfill: bool,
275    /// The backfill job to run before consuming any notifications.
276    backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
277}
278
279impl<P, E> ExExNotificationsWithHead<P, E>
280where
281    E: BlockExecutorProvider,
282{
283    /// Creates a new [`ExExNotificationsWithHead`].
284    const fn new(
285        node_head: BlockNumHash,
286        provider: P,
287        executor: E,
288        notifications: Receiver<ExExNotification<E::Primitives>>,
289        wal_handle: WalHandle<E::Primitives>,
290        exex_head: ExExHead,
291    ) -> Self {
292        Self {
293            initial_local_head: node_head,
294            provider,
295            executor,
296            notifications,
297            wal_handle,
298            initial_exex_head: exex_head,
299            pending_check_canonical: true,
300            pending_check_backfill: true,
301            backfill_job: None,
302        }
303    }
304}
305
306impl<P, E> ExExNotificationsWithHead<P, E>
307where
308    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
309    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
310        + Clone
311        + Unpin
312        + 'static,
313{
314    /// Checks if the ExEx head is on the canonical chain.
315    ///
316    /// If the head block is not found in the database or it's ahead of the node head, it means
317    /// we're not on the canonical chain and we need to revert the notification with the ExEx
318    /// head block.
319    fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
320        if self.provider.is_known(&self.initial_exex_head.block.hash)? &&
321            self.initial_exex_head.block.number <= self.initial_local_head.number
322        {
323            // we have the targeted block and that block is below the current head
324            debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
325            return Ok(None)
326        }
327
328        // If the head block is not found in the database, it means we're not on the canonical
329        // chain.
330
331        // Get the committed notification for the head block from the WAL.
332        let Some(notification) = self
333            .wal_handle
334            .get_committed_notification_by_block_hash(&self.initial_exex_head.block.hash)?
335        else {
336            // it's possible that the exex head is further ahead
337            if self.initial_exex_head.block.number > self.initial_local_head.number {
338                debug!(target: "exex::notifications", "ExEx head is ahead of the canonical chain");
339                return Ok(None);
340            }
341
342            return Err(eyre::eyre!(
343                "Could not find notification for block hash {:?} in the WAL",
344                self.initial_exex_head.block.hash
345            ))
346        };
347
348        // Update the head block hash to the parent hash of the first committed block.
349        let committed_chain = notification.committed_chain().unwrap();
350        let new_exex_head =
351            (committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into();
352        debug!(target: "exex::notifications", old_exex_head = ?self.initial_exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
353        self.initial_exex_head.block = new_exex_head;
354
355        // Return an inverted notification. See the documentation for
356        // `ExExNotification::into_inverted`.
357        Ok(Some(notification.into_inverted()))
358    }
359
360    /// Compares the node head against the ExEx head, and backfills if needed.
361    ///
362    /// CAUTON: This method assumes that the ExEx head is <= the node head, and that it's on the
363    /// canonical chain.
364    ///
365    /// Possible situations are:
366    /// - ExEx is behind the node head (`node_head.number < exex_head.number`). Backfill from the
367    ///   node database.
368    /// - ExEx is at the same block number as the node head (`node_head.number ==
369    ///   exex_head.number`). Nothing to do.
370    fn check_backfill(&mut self) -> eyre::Result<()> {
371        let backfill_job_factory =
372            BackfillJobFactory::new(self.executor.clone(), self.provider.clone());
373        match self.initial_exex_head.block.number.cmp(&self.initial_local_head.number) {
374            std::cmp::Ordering::Less => {
375                // ExEx is behind the node head, start backfill
376                debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
377                let backfill = backfill_job_factory
378                    .backfill(
379                        self.initial_exex_head.block.number + 1..=self.initial_local_head.number,
380                    )
381                    .into_stream();
382                self.backfill_job = Some(backfill);
383            }
384            std::cmp::Ordering::Equal => {
385                debug!(target: "exex::notifications", "ExEx is at the node head");
386            }
387            std::cmp::Ordering::Greater => {
388                debug!(target: "exex::notifications", "ExEx is ahead of the node head");
389            }
390        };
391
392        Ok(())
393    }
394}
395
396impl<P, E> Stream for ExExNotificationsWithHead<P, E>
397where
398    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
399    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
400        + Clone
401        + Unpin
402        + 'static,
403{
404    type Item = eyre::Result<ExExNotification<E::Primitives>>;
405
406    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
407        let this = self.get_mut();
408
409        // 1. Check once whether we need to retrieve a notification gap from the WAL.
410        if this.pending_check_canonical {
411            if let Some(canonical_notification) = this.check_canonical()? {
412                return Poll::Ready(Some(Ok(canonical_notification)))
413            }
414
415            // ExEx head is on the canonical chain, we no longer need to check it
416            this.pending_check_canonical = false;
417        }
418
419        // 2. Check once whether we need to trigger backfill sync
420        if this.pending_check_backfill {
421            this.check_backfill()?;
422            this.pending_check_backfill = false;
423        }
424
425        // 3. If backfill is in progress yield new notifications
426        if let Some(backfill_job) = &mut this.backfill_job {
427            debug!(target: "exex::notifications", "Polling backfill job");
428            if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
429                debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
430                return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
431                    new: Arc::new(chain),
432                })))
433            }
434
435            // Backfill job is done, remove it
436            this.backfill_job = None;
437        }
438
439        // 4. Otherwise advance the regular event stream
440        loop {
441            let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
442                return Poll::Ready(None)
443            };
444
445            // 5. In case the exex is ahead of the new tip, we must skip it
446            if let Some(committed) = notification.committed_chain() {
447                // inclusive check because we should start with `exex.head + 1`
448                if this.initial_exex_head.block.number >= committed.tip().number() {
449                    continue
450                }
451            }
452
453            return Poll::Ready(Some(Ok(notification)))
454        }
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461    use crate::Wal;
462    use alloy_consensus::Header;
463    use alloy_eips::BlockNumHash;
464    use eyre::OptionExt;
465    use futures::StreamExt;
466    use reth_db_common::init::init_genesis;
467    use reth_ethereum_primitives::Block;
468    use reth_evm_ethereum::execute::EthExecutorProvider;
469    use reth_primitives_traits::Block as _;
470    use reth_provider::{
471        providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockWriter,
472        Chain, DatabaseProviderFactory, StorageLocation,
473    };
474    use reth_testing_utils::generators::{self, random_block, BlockParams};
475    use tokio::sync::mpsc;
476
477    #[tokio::test]
478    async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
479        let mut rng = generators::rng();
480
481        let temp_dir = tempfile::tempdir().unwrap();
482        let wal = Wal::new(temp_dir.path()).unwrap();
483
484        let provider_factory = create_test_provider_factory();
485        let genesis_hash = init_genesis(&provider_factory)?;
486        let genesis_block = provider_factory
487            .block(genesis_hash.into())?
488            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
489
490        let provider = BlockchainProvider::new(provider_factory.clone())?;
491
492        let node_head_block = random_block(
493            &mut rng,
494            genesis_block.number + 1,
495            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
496        );
497        let provider_rw = provider_factory.provider_rw()?;
498        provider_rw
499            .insert_block(node_head_block.clone().try_recover()?, StorageLocation::Database)?;
500        provider_rw.commit()?;
501
502        let node_head = node_head_block.num_hash();
503        let exex_head =
504            ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
505
506        let notification = ExExNotification::ChainCommitted {
507            new: Arc::new(Chain::new(
508                vec![random_block(
509                    &mut rng,
510                    node_head.number + 1,
511                    BlockParams { parent: Some(node_head.hash), ..Default::default() },
512                )
513                .try_recover()?],
514                Default::default(),
515                None,
516            )),
517        };
518
519        let (notifications_tx, notifications_rx) = mpsc::channel(1);
520
521        notifications_tx.send(notification.clone()).await?;
522
523        let mut notifications = ExExNotificationsWithoutHead::new(
524            node_head,
525            provider,
526            EthExecutorProvider::mainnet(),
527            notifications_rx,
528            wal.handle(),
529        )
530        .with_head(exex_head);
531
532        // First notification is the backfill of missing blocks from the canonical chain
533        assert_eq!(
534            notifications.next().await.transpose()?,
535            Some(ExExNotification::ChainCommitted {
536                new: Arc::new(
537                    BackfillJobFactory::new(
538                        notifications.executor.clone(),
539                        notifications.provider.clone()
540                    )
541                    .backfill(1..=1)
542                    .next()
543                    .ok_or_eyre("failed to backfill")??
544                )
545            })
546        );
547
548        // Second notification is the actual notification that we sent before
549        assert_eq!(notifications.next().await.transpose()?, Some(notification));
550
551        Ok(())
552    }
553
554    #[tokio::test]
555    async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
556        let temp_dir = tempfile::tempdir().unwrap();
557        let wal = Wal::new(temp_dir.path()).unwrap();
558
559        let provider_factory = create_test_provider_factory();
560        let genesis_hash = init_genesis(&provider_factory)?;
561        let genesis_block = provider_factory
562            .block(genesis_hash.into())?
563            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
564
565        let provider = BlockchainProvider::new(provider_factory)?;
566
567        let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
568        let exex_head = ExExHead { block: node_head };
569
570        let notification = ExExNotification::ChainCommitted {
571            new: Arc::new(Chain::new(
572                vec![Block {
573                    header: Header {
574                        parent_hash: node_head.hash,
575                        number: node_head.number + 1,
576                        ..Default::default()
577                    },
578                    ..Default::default()
579                }
580                .seal_slow()
581                .try_recover()?],
582                Default::default(),
583                None,
584            )),
585        };
586
587        let (notifications_tx, notifications_rx) = mpsc::channel(1);
588
589        notifications_tx.send(notification.clone()).await?;
590
591        let mut notifications = ExExNotificationsWithoutHead::new(
592            node_head,
593            provider,
594            EthExecutorProvider::mainnet(),
595            notifications_rx,
596            wal.handle(),
597        )
598        .with_head(exex_head);
599
600        let new_notification = notifications.next().await.transpose()?;
601        assert_eq!(new_notification, Some(notification));
602
603        Ok(())
604    }
605
606    #[tokio::test]
607    async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
608        let mut rng = generators::rng();
609
610        let temp_dir = tempfile::tempdir().unwrap();
611        let wal = Wal::new(temp_dir.path()).unwrap();
612
613        let provider_factory = create_test_provider_factory();
614        let genesis_hash = init_genesis(&provider_factory)?;
615        let genesis_block = provider_factory
616            .block(genesis_hash.into())?
617            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
618
619        let provider = BlockchainProvider::new(provider_factory)?;
620
621        let node_head_block = random_block(
622            &mut rng,
623            genesis_block.number + 1,
624            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
625        )
626        .try_recover()?;
627        let node_head = node_head_block.num_hash();
628        let provider_rw = provider.database_provider_rw()?;
629        provider_rw.insert_block(node_head_block, StorageLocation::Database)?;
630        provider_rw.commit()?;
631        let node_head_notification = ExExNotification::ChainCommitted {
632            new: Arc::new(
633                BackfillJobFactory::new(EthExecutorProvider::mainnet(), provider.clone())
634                    .backfill(node_head.number..=node_head.number)
635                    .next()
636                    .ok_or_else(|| eyre::eyre!("failed to backfill"))??,
637            ),
638        };
639
640        let exex_head_block = random_block(
641            &mut rng,
642            genesis_block.number + 1,
643            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
644        );
645        let exex_head = ExExHead { block: exex_head_block.num_hash() };
646        let exex_head_notification = ExExNotification::ChainCommitted {
647            new: Arc::new(Chain::new(
648                vec![exex_head_block.clone().try_recover()?],
649                Default::default(),
650                None,
651            )),
652        };
653        wal.commit(&exex_head_notification)?;
654
655        let new_notification = ExExNotification::ChainCommitted {
656            new: Arc::new(Chain::new(
657                vec![random_block(
658                    &mut rng,
659                    node_head.number + 1,
660                    BlockParams { parent: Some(node_head.hash), ..Default::default() },
661                )
662                .try_recover()?],
663                Default::default(),
664                None,
665            )),
666        };
667
668        let (notifications_tx, notifications_rx) = mpsc::channel(1);
669
670        notifications_tx.send(new_notification.clone()).await?;
671
672        let mut notifications = ExExNotificationsWithoutHead::new(
673            node_head,
674            provider,
675            EthExecutorProvider::mainnet(),
676            notifications_rx,
677            wal.handle(),
678        )
679        .with_head(exex_head);
680
681        // First notification is the revert of the ExEx head block to get back to the canonical
682        // chain
683        assert_eq!(
684            notifications.next().await.transpose()?,
685            Some(exex_head_notification.into_inverted())
686        );
687        // Second notification is the backfilled block from the canonical chain to get back to the
688        // canonical tip
689        assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
690        // Third notification is the actual notification that we sent before
691        assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
692
693        Ok(())
694    }
695
696    #[tokio::test]
697    async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
698        reth_tracing::init_test_tracing();
699        let mut rng = generators::rng();
700
701        let temp_dir = tempfile::tempdir().unwrap();
702        let wal = Wal::new(temp_dir.path()).unwrap();
703
704        let provider_factory = create_test_provider_factory();
705        let genesis_hash = init_genesis(&provider_factory)?;
706        let genesis_block = provider_factory
707            .block(genesis_hash.into())?
708            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
709
710        let provider = BlockchainProvider::new(provider_factory)?;
711
712        let exex_head_block = random_block(
713            &mut rng,
714            genesis_block.number + 1,
715            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
716        );
717        let exex_head_notification = ExExNotification::ChainCommitted {
718            new: Arc::new(Chain::new(
719                vec![exex_head_block.clone().try_recover()?],
720                Default::default(),
721                None,
722            )),
723        };
724        wal.commit(&exex_head_notification)?;
725
726        let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
727        let exex_head = ExExHead {
728            block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
729        };
730
731        let new_notification = ExExNotification::ChainCommitted {
732            new: Arc::new(Chain::new(
733                vec![random_block(
734                    &mut rng,
735                    genesis_block.number + 1,
736                    BlockParams { parent: Some(genesis_hash), ..Default::default() },
737                )
738                .try_recover()?],
739                Default::default(),
740                None,
741            )),
742        };
743
744        let (notifications_tx, notifications_rx) = mpsc::channel(1);
745
746        notifications_tx.send(new_notification.clone()).await?;
747
748        let mut notifications = ExExNotificationsWithoutHead::new(
749            node_head,
750            provider,
751            EthExecutorProvider::mainnet(),
752            notifications_rx,
753            wal.handle(),
754        )
755        .with_head(exex_head);
756
757        // First notification is the revert of the ExEx head block to get back to the canonical
758        // chain
759        assert_eq!(
760            notifications.next().await.transpose()?,
761            Some(exex_head_notification.into_inverted())
762        );
763
764        // Second notification is the actual notification that we sent before
765        assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
766
767        Ok(())
768    }
769}