Skip to main content

reth_exex/
notifications.rs

1use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use futures::{Stream, StreamExt};
5use reth_ethereum_primitives::EthPrimitives;
6use reth_evm::ConfigureEvm;
7use reth_exex_types::ExExHead;
8use reth_node_api::NodePrimitives;
9use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
10use reth_stages_api::ExecutionStageThresholds;
11use reth_tracing::tracing::debug;
12use std::{
13    fmt::Debug,
14    pin::Pin,
15    sync::Arc,
16    task::{ready, Context, Poll},
17};
18use tokio::sync::mpsc::Receiver;
19
20/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. If the
21/// stream is configured with a head via [`ExExNotifications::set_with_head`] or
22/// [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
23#[derive(Debug)]
24pub struct ExExNotifications<P, E>
25where
26    E: ConfigureEvm,
27{
28    inner: ExExNotificationsInner<P, E>,
29}
30
31/// A trait, that represents a stream of [`ExExNotification`]s. The stream will emit notifications
32/// for all blocks. If the stream is configured with a head via [`ExExNotifications::set_with_head`]
33/// or [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
34pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
35    Stream<Item = eyre::Result<ExExNotification<N>>> + Unpin
36{
37    /// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s without a head.
38    ///
39    /// It's a no-op if the stream has already been configured without a head.
40    ///
41    /// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
42    fn set_without_head(&mut self);
43
44    /// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s with the provided
45    /// head.
46    ///
47    /// It's a no-op if the stream has already been configured with a head.
48    ///
49    /// See the documentation of [`ExExNotificationsWithHead`] for more details.
50    fn set_with_head(&mut self, exex_head: ExExHead);
51
52    /// Returns a new [`ExExNotificationsStream`] without a head.
53    ///
54    /// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
55    fn without_head(self) -> Self
56    where
57        Self: Sized;
58
59    /// Returns a new [`ExExNotificationsStream`] with the provided head.
60    ///
61    /// See the documentation of [`ExExNotificationsWithHead`] for more details.
62    fn with_head(self, exex_head: ExExHead) -> Self
63    where
64        Self: Sized;
65
66    /// Sets custom thresholds for the backfill job.
67    ///
68    /// These thresholds control how many blocks are included in each backfill notification.
69    /// Only takes effect when the stream is configured with a head.
70    ///
71    /// By default, the backfill job uses [`BackfillJobFactory`] defaults (up to 500,000 blocks
72    /// per batch, bounded by 30s execution time).
73    fn set_backfill_thresholds(&mut self, _thresholds: ExecutionStageThresholds) {}
74}
75
76#[derive(Debug)]
77enum ExExNotificationsInner<P, E>
78where
79    E: ConfigureEvm,
80{
81    /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
82    WithoutHead(ExExNotificationsWithoutHead<P, E>),
83    /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that
84    /// are committed or reverted after the given head.
85    WithHead(Box<ExExNotificationsWithHead<P, E>>),
86    /// Internal state used when transitioning between [`ExExNotificationsInner::WithoutHead`] and
87    /// [`ExExNotificationsInner::WithHead`].
88    Invalid,
89}
90
91impl<P, E> ExExNotifications<P, E>
92where
93    E: ConfigureEvm,
94{
95    /// Creates a new stream of [`ExExNotifications`] without a head.
96    pub const fn new(
97        node_head: BlockNumHash,
98        provider: P,
99        evm_config: E,
100        notifications: Receiver<ExExNotification<E::Primitives>>,
101        wal_handle: WalHandle<E::Primitives>,
102    ) -> Self {
103        Self {
104            inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
105                node_head,
106                provider,
107                evm_config,
108                notifications,
109                wal_handle,
110            )),
111        }
112    }
113}
114
115impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
116where
117    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
118    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
119{
120    fn set_without_head(&mut self) {
121        let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
122        self.inner = ExExNotificationsInner::WithoutHead(match current {
123            ExExNotificationsInner::WithoutHead(notifications) => notifications,
124            ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new(
125                notifications.initial_local_head,
126                notifications.provider,
127                notifications.evm_config,
128                notifications.notifications,
129                notifications.wal_handle,
130            ),
131            ExExNotificationsInner::Invalid => unreachable!(),
132        });
133    }
134
135    fn set_with_head(&mut self, exex_head: ExExHead) {
136        let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
137        self.inner = ExExNotificationsInner::WithHead(match current {
138            ExExNotificationsInner::WithoutHead(notifications) => {
139                Box::new(notifications.with_head(exex_head))
140            }
141            ExExNotificationsInner::WithHead(notifications) => {
142                Box::new(ExExNotificationsWithHead::new(
143                    notifications.initial_local_head,
144                    notifications.provider,
145                    notifications.evm_config,
146                    notifications.notifications,
147                    notifications.wal_handle,
148                    exex_head,
149                ))
150            }
151            ExExNotificationsInner::Invalid => unreachable!(),
152        });
153    }
154
155    fn without_head(mut self) -> Self {
156        self.set_without_head();
157        self
158    }
159
160    fn with_head(mut self, exex_head: ExExHead) -> Self {
161        self.set_with_head(exex_head);
162        self
163    }
164
165    fn set_backfill_thresholds(&mut self, thresholds: ExecutionStageThresholds) {
166        if let ExExNotificationsInner::WithHead(notifications) = &mut self.inner {
167            notifications.backfill_thresholds = Some(thresholds);
168        }
169    }
170}
171
172impl<P, E> Stream for ExExNotifications<P, E>
173where
174    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
175    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
176{
177    type Item = eyre::Result<ExExNotification<E::Primitives>>;
178
179    fn poll_next(
180        self: std::pin::Pin<&mut Self>,
181        cx: &mut std::task::Context<'_>,
182    ) -> std::task::Poll<Option<Self::Item>> {
183        match &mut self.get_mut().inner {
184            ExExNotificationsInner::WithoutHead(notifications) => {
185                notifications.poll_next_unpin(cx).map(|result| result.map(Ok))
186            }
187            ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx),
188            ExExNotificationsInner::Invalid => unreachable!(),
189        }
190    }
191}
192
193/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
194pub struct ExExNotificationsWithoutHead<P, E>
195where
196    E: ConfigureEvm,
197{
198    node_head: BlockNumHash,
199    provider: P,
200    evm_config: E,
201    notifications: Receiver<ExExNotification<E::Primitives>>,
202    wal_handle: WalHandle<E::Primitives>,
203}
204
205impl<P: Debug, E> Debug for ExExNotificationsWithoutHead<P, E>
206where
207    E: ConfigureEvm + Debug,
208{
209    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210        f.debug_struct("ExExNotifications")
211            .field("provider", &self.provider)
212            .field("evm_config", &self.evm_config)
213            .field("notifications", &self.notifications)
214            .finish()
215    }
216}
217
218impl<P, E> ExExNotificationsWithoutHead<P, E>
219where
220    E: ConfigureEvm,
221{
222    /// Creates a new instance of [`ExExNotificationsWithoutHead`].
223    const fn new(
224        node_head: BlockNumHash,
225        provider: P,
226        evm_config: E,
227        notifications: Receiver<ExExNotification<E::Primitives>>,
228        wal_handle: WalHandle<E::Primitives>,
229    ) -> Self {
230        Self { node_head, provider, evm_config, notifications, wal_handle }
231    }
232
233    /// Subscribe to notifications with the given head.
234    fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
235        ExExNotificationsWithHead::new(
236            self.node_head,
237            self.provider,
238            self.evm_config,
239            self.notifications,
240            self.wal_handle,
241            head,
242        )
243    }
244}
245
246impl<P: Unpin, E> Stream for ExExNotificationsWithoutHead<P, E>
247where
248    E: ConfigureEvm,
249{
250    type Item = ExExNotification<E::Primitives>;
251
252    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
253        self.get_mut().notifications.poll_recv(cx)
254    }
255}
256
257/// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are
258/// committed or reverted after the given head. The head is the ExEx's latest view of the host
259/// chain.
260///
261/// Notifications will be sent starting from the head, not inclusive. For example, if
262/// `exex_head.number == 10`, then the first notification will be with `block.number == 11`. An
263/// `exex_head.number` of 10 indicates that the ExEx has processed up to block 10, and is ready to
264/// process block 11.
265#[derive(Debug)]
266pub struct ExExNotificationsWithHead<P, E>
267where
268    E: ConfigureEvm,
269{
270    /// The node's local head at launch.
271    initial_local_head: BlockNumHash,
272    provider: P,
273    evm_config: E,
274    notifications: Receiver<ExExNotification<E::Primitives>>,
275    wal_handle: WalHandle<E::Primitives>,
276    /// The exex head at launch
277    initial_exex_head: ExExHead,
278
279    /// If true, then we need to check if the ExEx head is on the canonical chain and if not,
280    /// revert its head.
281    pending_check_canonical: bool,
282    /// If true, then we need to check if the ExEx head is behind the node head and if so, backfill
283    /// the missing blocks.
284    pending_check_backfill: bool,
285    /// The backfill job to run before consuming any notifications.
286    backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
287    /// Custom thresholds for the backfill job, if set.
288    backfill_thresholds: Option<ExecutionStageThresholds>,
289}
290
291impl<P, E> ExExNotificationsWithHead<P, E>
292where
293    E: ConfigureEvm,
294{
295    /// Creates a new [`ExExNotificationsWithHead`].
296    const fn new(
297        node_head: BlockNumHash,
298        provider: P,
299        evm_config: E,
300        notifications: Receiver<ExExNotification<E::Primitives>>,
301        wal_handle: WalHandle<E::Primitives>,
302        exex_head: ExExHead,
303    ) -> Self {
304        Self {
305            initial_local_head: node_head,
306            provider,
307            evm_config,
308            notifications,
309            wal_handle,
310            initial_exex_head: exex_head,
311            pending_check_canonical: true,
312            pending_check_backfill: true,
313            backfill_job: None,
314            backfill_thresholds: None,
315        }
316    }
317
318    /// Sets custom thresholds for the backfill job.
319    ///
320    /// These thresholds control how many blocks are included in each backfill notification.
321    /// By default, the backfill job uses [`BackfillJobFactory`] defaults (up to 500,000 blocks
322    /// per batch, bounded by 30s execution time).
323    ///
324    /// If your ExEx is memory-constrained, consider setting a lower `max_blocks` value to
325    /// reduce the size of each backfill notification.
326    pub const fn with_backfill_thresholds(mut self, thresholds: ExecutionStageThresholds) -> Self {
327        self.backfill_thresholds = Some(thresholds);
328        self
329    }
330}
331
332impl<P, E> ExExNotificationsWithHead<P, E>
333where
334    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
335    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
336{
337    /// Checks if the ExEx head is on the canonical chain.
338    ///
339    /// If the head block is not found in the database or it's ahead of the node head, it means
340    /// we're not on the canonical chain and we need to revert the notification with the ExEx
341    /// head block.
342    fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
343        if self.provider.is_known(self.initial_exex_head.block.hash)? &&
344            self.initial_exex_head.block.number <= self.initial_local_head.number
345        {
346            // we have the targeted block and that block is below the current head
347            debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
348            return Ok(None)
349        }
350
351        // If the head block is not found in the database, it means we're not on the canonical
352        // chain.
353
354        // Get the committed notification for the head block from the WAL.
355        let Some(notification) = self
356            .wal_handle
357            .get_committed_notification_by_block_hash(&self.initial_exex_head.block.hash)?
358        else {
359            // it's possible that the exex head is further ahead
360            if self.initial_exex_head.block.number > self.initial_local_head.number {
361                debug!(target: "exex::notifications", "ExEx head is ahead of the canonical chain");
362                return Ok(None);
363            }
364
365            return Err(eyre::eyre!(
366                "Could not find notification for block hash {:?} in the WAL",
367                self.initial_exex_head.block.hash
368            ))
369        };
370
371        // Update the head block hash to the parent hash of the first committed block.
372        let committed_chain = notification.committed_chain().unwrap();
373        let new_exex_head =
374            (committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into();
375        debug!(target: "exex::notifications", old_exex_head = ?self.initial_exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
376        self.initial_exex_head.block = new_exex_head;
377
378        // Return an inverted notification. See the documentation for
379        // `ExExNotification::into_inverted`.
380        Ok(Some(notification.into_inverted()))
381    }
382
383    /// Compares the node head against the ExEx head, and backfills if needed.
384    ///
385    /// CAUTION: This method assumes that the ExEx head is <= the node head, and that it's on the
386    /// canonical chain.
387    ///
388    /// Possible situations are:
389    /// - ExEx is behind the node head (`exex_head.number < node_head.number`). Backfill from the
390    ///   node database.
391    /// - ExEx is at the same block number as the node head (`exex_head.number ==
392    ///   node_head.number`). Nothing to do.
393    fn check_backfill(&mut self) -> eyre::Result<()> {
394        let mut backfill_job_factory =
395            BackfillJobFactory::new(self.evm_config.clone(), self.provider.clone());
396        if let Some(thresholds) = self.backfill_thresholds.clone() {
397            backfill_job_factory = backfill_job_factory.with_thresholds(thresholds);
398        }
399        match self.initial_exex_head.block.number.cmp(&self.initial_local_head.number) {
400            std::cmp::Ordering::Less => {
401                // ExEx is behind the node head, start backfill
402                debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
403                let backfill = backfill_job_factory
404                    .backfill(
405                        self.initial_exex_head.block.number + 1..=self.initial_local_head.number,
406                    )
407                    .into_stream();
408                self.backfill_job = Some(backfill);
409            }
410            std::cmp::Ordering::Equal => {
411                debug!(target: "exex::notifications", "ExEx is at the node head");
412            }
413            std::cmp::Ordering::Greater => {
414                debug!(target: "exex::notifications", "ExEx is ahead of the node head");
415            }
416        };
417
418        Ok(())
419    }
420}
421
422impl<P, E> Stream for ExExNotificationsWithHead<P, E>
423where
424    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
425    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
426{
427    type Item = eyre::Result<ExExNotification<E::Primitives>>;
428
429    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
430        let this = self.get_mut();
431
432        // 1. Check once whether we need to retrieve a notification gap from the WAL.
433        if this.pending_check_canonical {
434            if let Some(canonical_notification) = this.check_canonical()? {
435                return Poll::Ready(Some(Ok(canonical_notification)))
436            }
437
438            // ExEx head is on the canonical chain, we no longer need to check it
439            this.pending_check_canonical = false;
440        }
441
442        // 2. Check once whether we need to trigger backfill sync
443        if this.pending_check_backfill {
444            this.check_backfill()?;
445            this.pending_check_backfill = false;
446        }
447
448        // 3. If backfill is in progress yield new notifications
449        if let Some(backfill_job) = &mut this.backfill_job {
450            debug!(target: "exex::notifications", "Polling backfill job");
451            if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
452                debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
453                return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
454                    new: Arc::new(chain),
455                })))
456            }
457
458            // Backfill job is done, remove it
459            this.backfill_job = None;
460        }
461
462        // 4. Otherwise advance the regular event stream
463        loop {
464            let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
465                return Poll::Ready(None)
466            };
467
468            // 5. In case the exex is ahead of the new tip, we must skip it
469            if let Some(committed) = notification.committed_chain() {
470                // inclusive check because we should start with `exex.head + 1`
471                if this.initial_exex_head.block.number >= committed.tip().number() {
472                    continue
473                }
474            }
475
476            return Poll::Ready(Some(Ok(notification)))
477        }
478    }
479}
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484    use crate::Wal;
485    use alloy_consensus::Header;
486    use alloy_eips::BlockNumHash;
487    use eyre::OptionExt;
488    use futures::StreamExt;
489    use reth_db_common::init::init_genesis;
490    use reth_ethereum_primitives::Block;
491    use reth_evm_ethereum::EthEvmConfig;
492    use reth_primitives_traits::Block as _;
493    use reth_provider::{
494        providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockWriter,
495        Chain, DBProvider, DatabaseProviderFactory,
496    };
497    use reth_testing_utils::generators::{self, random_block, BlockParams};
498    use std::collections::BTreeMap;
499    use tokio::sync::mpsc;
500
501    #[tokio::test]
502    async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
503        let mut rng = generators::rng();
504
505        let temp_dir = tempfile::tempdir().unwrap();
506        let wal = Wal::new(temp_dir.path()).unwrap();
507
508        let provider_factory = create_test_provider_factory();
509        let genesis_hash = init_genesis(&provider_factory)?;
510        let genesis_block = provider_factory
511            .block(genesis_hash.into())?
512            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
513
514        let provider = BlockchainProvider::new(provider_factory.clone())?;
515
516        let node_head_block = random_block(
517            &mut rng,
518            genesis_block.number + 1,
519            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
520        )
521        .try_recover()?;
522        let node_head = node_head_block.num_hash();
523        let provider_rw = provider_factory.provider_rw()?;
524        provider_rw.insert_block(&node_head_block)?;
525        provider_rw.commit()?;
526        let exex_head =
527            ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
528
529        let notification = ExExNotification::ChainCommitted {
530            new: Arc::new(Chain::new(
531                vec![random_block(
532                    &mut rng,
533                    node_head.number + 1,
534                    BlockParams { parent: Some(node_head.hash), ..Default::default() },
535                )
536                .try_recover()?],
537                Default::default(),
538                BTreeMap::new(),
539            )),
540        };
541
542        let (notifications_tx, notifications_rx) = mpsc::channel(1);
543
544        notifications_tx.send(notification.clone()).await?;
545
546        let mut notifications = ExExNotificationsWithoutHead::new(
547            node_head,
548            provider,
549            EthEvmConfig::mainnet(),
550            notifications_rx,
551            wal.handle(),
552        )
553        .with_head(exex_head);
554
555        // First notification is the backfill of missing blocks from the canonical chain
556        assert_eq!(
557            notifications.next().await.transpose()?,
558            Some(ExExNotification::ChainCommitted {
559                new: Arc::new(
560                    BackfillJobFactory::new(
561                        notifications.evm_config.clone(),
562                        notifications.provider.clone()
563                    )
564                    .backfill(1..=1)
565                    .next()
566                    .ok_or_eyre("failed to backfill")??
567                )
568            })
569        );
570
571        // Second notification is the actual notification that we sent before
572        assert_eq!(notifications.next().await.transpose()?, Some(notification));
573
574        Ok(())
575    }
576
577    #[tokio::test]
578    async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
579        let temp_dir = tempfile::tempdir().unwrap();
580        let wal = Wal::new(temp_dir.path()).unwrap();
581
582        let provider_factory = create_test_provider_factory();
583        let genesis_hash = init_genesis(&provider_factory)?;
584        let genesis_block = provider_factory
585            .block(genesis_hash.into())?
586            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
587
588        let provider = BlockchainProvider::new(provider_factory)?;
589
590        let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
591        let exex_head = ExExHead { block: node_head };
592
593        let notification = ExExNotification::ChainCommitted {
594            new: Arc::new(Chain::new(
595                vec![Block {
596                    header: Header {
597                        parent_hash: node_head.hash,
598                        number: node_head.number + 1,
599                        ..Default::default()
600                    },
601                    ..Default::default()
602                }
603                .seal_slow()
604                .try_recover()?],
605                Default::default(),
606                BTreeMap::new(),
607            )),
608        };
609
610        let (notifications_tx, notifications_rx) = mpsc::channel(1);
611
612        notifications_tx.send(notification.clone()).await?;
613
614        let mut notifications = ExExNotificationsWithoutHead::new(
615            node_head,
616            provider,
617            EthEvmConfig::mainnet(),
618            notifications_rx,
619            wal.handle(),
620        )
621        .with_head(exex_head);
622
623        let new_notification = notifications.next().await.transpose()?;
624        assert_eq!(new_notification, Some(notification));
625
626        Ok(())
627    }
628
629    #[tokio::test]
630    async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
631        let mut rng = generators::rng();
632
633        let temp_dir = tempfile::tempdir().unwrap();
634        let wal = Wal::new(temp_dir.path()).unwrap();
635
636        let provider_factory = create_test_provider_factory();
637        let genesis_hash = init_genesis(&provider_factory)?;
638        let genesis_block = provider_factory
639            .block(genesis_hash.into())?
640            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
641
642        let provider = BlockchainProvider::new(provider_factory)?;
643
644        let node_head_block = random_block(
645            &mut rng,
646            genesis_block.number + 1,
647            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
648        )
649        .try_recover()?;
650        let node_head = node_head_block.num_hash();
651        let provider_rw = provider.database_provider_rw()?;
652        provider_rw.insert_block(&node_head_block)?;
653        provider_rw.commit()?;
654        let node_head_notification = ExExNotification::ChainCommitted {
655            new: Arc::new(
656                BackfillJobFactory::new(EthEvmConfig::mainnet(), provider.clone())
657                    .backfill(node_head.number..=node_head.number)
658                    .next()
659                    .ok_or_else(|| eyre::eyre!("failed to backfill"))??,
660            ),
661        };
662
663        let exex_head_block = random_block(
664            &mut rng,
665            genesis_block.number + 1,
666            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
667        );
668        let exex_head = ExExHead { block: exex_head_block.num_hash() };
669        let exex_head_notification = ExExNotification::ChainCommitted {
670            new: Arc::new(Chain::new(
671                vec![exex_head_block.clone().try_recover()?],
672                Default::default(),
673                BTreeMap::new(),
674            )),
675        };
676        wal.commit(&exex_head_notification)?;
677
678        let new_notification = ExExNotification::ChainCommitted {
679            new: Arc::new(Chain::new(
680                vec![random_block(
681                    &mut rng,
682                    node_head.number + 1,
683                    BlockParams { parent: Some(node_head.hash), ..Default::default() },
684                )
685                .try_recover()?],
686                Default::default(),
687                BTreeMap::new(),
688            )),
689        };
690
691        let (notifications_tx, notifications_rx) = mpsc::channel(1);
692
693        notifications_tx.send(new_notification.clone()).await?;
694
695        let mut notifications = ExExNotificationsWithoutHead::new(
696            node_head,
697            provider,
698            EthEvmConfig::mainnet(),
699            notifications_rx,
700            wal.handle(),
701        )
702        .with_head(exex_head);
703
704        // First notification is the revert of the ExEx head block to get back to the canonical
705        // chain
706        assert_eq!(
707            notifications.next().await.transpose()?,
708            Some(exex_head_notification.into_inverted())
709        );
710        // Second notification is the backfilled block from the canonical chain to get back to the
711        // canonical tip
712        assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
713        // Third notification is the actual notification that we sent before
714        assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
715
716        Ok(())
717    }
718
719    #[tokio::test]
720    async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
721        reth_tracing::init_test_tracing();
722        let mut rng = generators::rng();
723
724        let temp_dir = tempfile::tempdir().unwrap();
725        let wal = Wal::new(temp_dir.path()).unwrap();
726
727        let provider_factory = create_test_provider_factory();
728        let genesis_hash = init_genesis(&provider_factory)?;
729        let genesis_block = provider_factory
730            .block(genesis_hash.into())?
731            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
732
733        let provider = BlockchainProvider::new(provider_factory)?;
734
735        let exex_head_block = random_block(
736            &mut rng,
737            genesis_block.number + 1,
738            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
739        );
740        let exex_head_notification = ExExNotification::ChainCommitted {
741            new: Arc::new(Chain::new(
742                vec![exex_head_block.clone().try_recover()?],
743                Default::default(),
744                BTreeMap::new(),
745            )),
746        };
747        wal.commit(&exex_head_notification)?;
748
749        let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
750        let exex_head = ExExHead {
751            block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
752        };
753
754        let new_notification = ExExNotification::ChainCommitted {
755            new: Arc::new(Chain::new(
756                vec![random_block(
757                    &mut rng,
758                    genesis_block.number + 1,
759                    BlockParams { parent: Some(genesis_hash), ..Default::default() },
760                )
761                .try_recover()?],
762                Default::default(),
763                BTreeMap::new(),
764            )),
765        };
766
767        let (notifications_tx, notifications_rx) = mpsc::channel(1);
768
769        notifications_tx.send(new_notification.clone()).await?;
770
771        let mut notifications = ExExNotificationsWithoutHead::new(
772            node_head,
773            provider,
774            EthEvmConfig::mainnet(),
775            notifications_rx,
776            wal.handle(),
777        )
778        .with_head(exex_head);
779
780        // First notification is the revert of the ExEx head block to get back to the canonical
781        // chain
782        assert_eq!(
783            notifications.next().await.transpose()?,
784            Some(exex_head_notification.into_inverted())
785        );
786
787        // Second notification is the actual notification that we sent before
788        assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
789
790        Ok(())
791    }
792}