Skip to main content

reth_exex/
notifications.rs

1use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use futures::{Stream, StreamExt};
5use reth_ethereum_primitives::EthPrimitives;
6use reth_evm::ConfigureEvm;
7use reth_exex_types::ExExHead;
8use reth_node_api::NodePrimitives;
9use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
10use reth_stages_api::ExecutionStageThresholds;
11use reth_tracing::tracing::debug;
12use std::{
13    collections::VecDeque,
14    fmt::Debug,
15    pin::Pin,
16    sync::Arc,
17    task::{ready, Context, Poll},
18};
19use tokio::sync::mpsc::Receiver;
20
21/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. If the
22/// stream is configured with a head via [`ExExNotifications::set_with_head`] or
23/// [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
24#[derive(Debug)]
25pub struct ExExNotifications<P, E>
26where
27    E: ConfigureEvm,
28{
29    inner: ExExNotificationsInner<P, E>,
30}
31
32/// A trait, that represents a stream of [`ExExNotification`]s. The stream will emit notifications
33/// for all blocks. If the stream is configured with a head via [`ExExNotifications::set_with_head`]
34/// or [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head.
35pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
36    Stream<Item = eyre::Result<ExExNotification<N>>> + Unpin
37{
38    /// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s without a head.
39    ///
40    /// It's a no-op if the stream has already been configured without a head.
41    ///
42    /// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
43    fn set_without_head(&mut self);
44
45    /// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s with the provided
46    /// head.
47    ///
48    /// It's a no-op if the stream has already been configured with a head.
49    ///
50    /// See the documentation of [`ExExNotificationsWithHead`] for more details.
51    fn set_with_head(&mut self, exex_head: ExExHead);
52
53    /// Returns a new [`ExExNotificationsStream`] without a head.
54    ///
55    /// See the documentation of [`ExExNotificationsWithoutHead`] for more details.
56    fn without_head(self) -> Self
57    where
58        Self: Sized;
59
60    /// Returns a new [`ExExNotificationsStream`] with the provided head.
61    ///
62    /// See the documentation of [`ExExNotificationsWithHead`] for more details.
63    fn with_head(self, exex_head: ExExHead) -> Self
64    where
65        Self: Sized;
66
67    /// Sets custom thresholds for the backfill job.
68    ///
69    /// These thresholds control how many blocks are included in each backfill notification.
70    /// Only takes effect when the stream is configured with a head.
71    ///
72    /// By default, the backfill job uses [`BackfillJobFactory`] defaults (up to 500,000 blocks
73    /// per batch, bounded by 30s execution time).
74    fn set_backfill_thresholds(&mut self, _thresholds: ExecutionStageThresholds) {}
75}
76
77#[derive(Debug)]
78enum ExExNotificationsInner<P, E>
79where
80    E: ConfigureEvm,
81{
82    /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
83    WithoutHead(ExExNotificationsWithoutHead<P, E>),
84    /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that
85    /// are committed or reverted after the given head.
86    WithHead(Box<ExExNotificationsWithHead<P, E>>),
87    /// Internal state used when transitioning between [`ExExNotificationsInner::WithoutHead`] and
88    /// [`ExExNotificationsInner::WithHead`].
89    Invalid,
90}
91
92impl<P, E> ExExNotifications<P, E>
93where
94    E: ConfigureEvm,
95{
96    /// Creates a new stream of [`ExExNotifications`] without a head.
97    pub const fn new(
98        node_head: BlockNumHash,
99        provider: P,
100        evm_config: E,
101        notifications: Receiver<ExExNotification<E::Primitives>>,
102        wal_handle: WalHandle<E::Primitives>,
103    ) -> Self {
104        Self {
105            inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
106                node_head,
107                provider,
108                evm_config,
109                notifications,
110                wal_handle,
111            )),
112        }
113    }
114}
115
116impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
117where
118    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
119    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
120{
121    fn set_without_head(&mut self) {
122        let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
123        self.inner = ExExNotificationsInner::WithoutHead(match current {
124            ExExNotificationsInner::WithoutHead(notifications) => notifications,
125            ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new(
126                notifications.initial_local_head,
127                notifications.provider,
128                notifications.evm_config,
129                notifications.notifications,
130                notifications.wal_handle,
131            ),
132            ExExNotificationsInner::Invalid => unreachable!(),
133        });
134    }
135
136    fn set_with_head(&mut self, exex_head: ExExHead) {
137        let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
138        self.inner = ExExNotificationsInner::WithHead(match current {
139            ExExNotificationsInner::WithoutHead(notifications) => {
140                Box::new(notifications.with_head(exex_head))
141            }
142            ExExNotificationsInner::WithHead(notifications) => {
143                Box::new(ExExNotificationsWithHead::new(
144                    notifications.initial_local_head,
145                    notifications.provider,
146                    notifications.evm_config,
147                    notifications.notifications,
148                    notifications.wal_handle,
149                    exex_head,
150                ))
151            }
152            ExExNotificationsInner::Invalid => unreachable!(),
153        });
154    }
155
156    fn without_head(mut self) -> Self {
157        self.set_without_head();
158        self
159    }
160
161    fn with_head(mut self, exex_head: ExExHead) -> Self {
162        self.set_with_head(exex_head);
163        self
164    }
165
166    fn set_backfill_thresholds(&mut self, thresholds: ExecutionStageThresholds) {
167        if let ExExNotificationsInner::WithHead(notifications) = &mut self.inner {
168            notifications.backfill_thresholds = Some(thresholds);
169        }
170    }
171}
172
173impl<P, E> Stream for ExExNotifications<P, E>
174where
175    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
176    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
177{
178    type Item = eyre::Result<ExExNotification<E::Primitives>>;
179
180    fn poll_next(
181        self: std::pin::Pin<&mut Self>,
182        cx: &mut std::task::Context<'_>,
183    ) -> std::task::Poll<Option<Self::Item>> {
184        match &mut self.get_mut().inner {
185            ExExNotificationsInner::WithoutHead(notifications) => {
186                notifications.poll_next_unpin(cx).map(|result| result.map(Ok))
187            }
188            ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx),
189            ExExNotificationsInner::Invalid => unreachable!(),
190        }
191    }
192}
193
194/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
195pub struct ExExNotificationsWithoutHead<P, E>
196where
197    E: ConfigureEvm,
198{
199    node_head: BlockNumHash,
200    provider: P,
201    evm_config: E,
202    notifications: Receiver<ExExNotification<E::Primitives>>,
203    wal_handle: WalHandle<E::Primitives>,
204}
205
206impl<P: Debug, E> Debug for ExExNotificationsWithoutHead<P, E>
207where
208    E: ConfigureEvm + Debug,
209{
210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211        f.debug_struct("ExExNotifications")
212            .field("provider", &self.provider)
213            .field("evm_config", &self.evm_config)
214            .field("notifications", &self.notifications)
215            .finish()
216    }
217}
218
219impl<P, E> ExExNotificationsWithoutHead<P, E>
220where
221    E: ConfigureEvm,
222{
223    /// Creates a new instance of [`ExExNotificationsWithoutHead`].
224    const fn new(
225        node_head: BlockNumHash,
226        provider: P,
227        evm_config: E,
228        notifications: Receiver<ExExNotification<E::Primitives>>,
229        wal_handle: WalHandle<E::Primitives>,
230    ) -> Self {
231        Self { node_head, provider, evm_config, notifications, wal_handle }
232    }
233
234    /// Subscribe to notifications with the given head.
235    fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
236        ExExNotificationsWithHead::new(
237            self.node_head,
238            self.provider,
239            self.evm_config,
240            self.notifications,
241            self.wal_handle,
242            head,
243        )
244    }
245}
246
247impl<P: Unpin, E> Stream for ExExNotificationsWithoutHead<P, E>
248where
249    E: ConfigureEvm,
250{
251    type Item = ExExNotification<E::Primitives>;
252
253    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
254        self.get_mut().notifications.poll_recv(cx)
255    }
256}
257
258/// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are
259/// committed or reverted after the given head. The head is the ExEx's latest view of the host
260/// chain.
261///
262/// Notifications will be sent starting from the head, not inclusive. For example, if
263/// `exex_head.number == 10`, then the first notification will be with `block.number == 11`. An
264/// `exex_head.number` of 10 indicates that the ExEx has processed up to block 10, and is ready to
265/// process block 11.
266#[derive(Debug)]
267pub struct ExExNotificationsWithHead<P, E>
268where
269    E: ConfigureEvm,
270{
271    /// The node's local head at launch.
272    initial_local_head: BlockNumHash,
273    provider: P,
274    evm_config: E,
275    notifications: Receiver<ExExNotification<E::Primitives>>,
276    wal_handle: WalHandle<E::Primitives>,
277    /// The exex head at launch
278    initial_exex_head: ExExHead,
279
280    /// If true, then we need to check if the ExEx head is on the canonical chain and if not,
281    /// revert its head.
282    pending_check_canonical: bool,
283    /// If true, then we need to check if the ExEx head is behind the node head and if so, backfill
284    /// the missing blocks.
285    pending_check_backfill: bool,
286    /// The backfill job to run before consuming any notifications.
287    backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
288    /// Custom thresholds for the backfill job, if set.
289    backfill_thresholds: Option<ExecutionStageThresholds>,
290    /// Notifications that arrived during backfill and need to be delivered after it completes.
291    /// These are notifications for blocks beyond the backfill range that we must not drop.
292    pending_notifications: VecDeque<ExExNotification<E::Primitives>>,
293}
294
295impl<P, E> ExExNotificationsWithHead<P, E>
296where
297    E: ConfigureEvm,
298{
299    /// Creates a new [`ExExNotificationsWithHead`].
300    const fn new(
301        node_head: BlockNumHash,
302        provider: P,
303        evm_config: E,
304        notifications: Receiver<ExExNotification<E::Primitives>>,
305        wal_handle: WalHandle<E::Primitives>,
306        exex_head: ExExHead,
307    ) -> Self {
308        Self {
309            initial_local_head: node_head,
310            provider,
311            evm_config,
312            notifications,
313            wal_handle,
314            initial_exex_head: exex_head,
315            pending_check_canonical: true,
316            pending_check_backfill: true,
317            backfill_job: None,
318            backfill_thresholds: None,
319            pending_notifications: VecDeque::new(),
320        }
321    }
322
323    /// Sets custom thresholds for the backfill job.
324    ///
325    /// These thresholds control how many blocks are included in each backfill notification.
326    /// By default, the backfill job uses [`BackfillJobFactory`] defaults (up to 500,000 blocks
327    /// per batch, bounded by 30s execution time).
328    ///
329    /// If your ExEx is memory-constrained, consider setting a lower `max_blocks` value to
330    /// reduce the size of each backfill notification.
331    pub const fn with_backfill_thresholds(mut self, thresholds: ExecutionStageThresholds) -> Self {
332        self.backfill_thresholds = Some(thresholds);
333        self
334    }
335}
336
337impl<P, E> ExExNotificationsWithHead<P, E>
338where
339    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
340    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
341{
342    /// Checks if the ExEx head is on the canonical chain.
343    ///
344    /// If the head block is not found in the database or it's ahead of the node head, it means
345    /// we're not on the canonical chain and we need to revert the notification with the ExEx
346    /// head block.
347    fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
348        if self.provider.is_known(self.initial_exex_head.block.hash)? &&
349            self.initial_exex_head.block.number <= self.initial_local_head.number
350        {
351            // we have the targeted block and that block is below the current head
352            debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
353            return Ok(None)
354        }
355
356        // If the head block is not found in the database, it means we're not on the canonical
357        // chain.
358
359        // Get the committed notification for the head block from the WAL.
360        let Some(notification) = self
361            .wal_handle
362            .get_committed_notification_by_block_hash(&self.initial_exex_head.block.hash)?
363        else {
364            // it's possible that the exex head is further ahead
365            if self.initial_exex_head.block.number > self.initial_local_head.number {
366                debug!(target: "exex::notifications", "ExEx head is ahead of the canonical chain");
367                return Ok(None);
368            }
369
370            return Err(eyre::eyre!(
371                "Could not find notification for block hash {:?} in the WAL",
372                self.initial_exex_head.block.hash
373            ))
374        };
375
376        // Update the head block hash to the parent hash of the first committed block.
377        let committed_chain = notification.committed_chain().unwrap();
378        let new_exex_head =
379            (committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into();
380        debug!(target: "exex::notifications", old_exex_head = ?self.initial_exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
381        self.initial_exex_head.block = new_exex_head;
382
383        // Return an inverted notification. See the documentation for
384        // `ExExNotification::into_inverted`.
385        Ok(Some(notification.into_inverted()))
386    }
387
388    /// Compares the node head against the ExEx head, and backfills if needed.
389    ///
390    /// CAUTION: This method assumes that the ExEx head is <= the node head, and that it's on the
391    /// canonical chain.
392    ///
393    /// Possible situations are:
394    /// - ExEx is behind the node head (`exex_head.number < node_head.number`). Backfill from the
395    ///   node database.
396    /// - ExEx is at the same block number as the node head (`exex_head.number ==
397    ///   node_head.number`). Nothing to do.
398    fn check_backfill(&mut self) -> eyre::Result<()> {
399        let mut backfill_job_factory =
400            BackfillJobFactory::new(self.evm_config.clone(), self.provider.clone());
401        if let Some(thresholds) = self.backfill_thresholds.clone() {
402            backfill_job_factory = backfill_job_factory.with_thresholds(thresholds);
403        }
404        match self.initial_exex_head.block.number.cmp(&self.initial_local_head.number) {
405            std::cmp::Ordering::Less => {
406                // ExEx is behind the node head, start backfill
407                debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
408                let backfill = backfill_job_factory
409                    .backfill(
410                        self.initial_exex_head.block.number + 1..=self.initial_local_head.number,
411                    )
412                    .into_stream();
413                self.backfill_job = Some(backfill);
414            }
415            std::cmp::Ordering::Equal => {
416                debug!(target: "exex::notifications", "ExEx is at the node head");
417            }
418            std::cmp::Ordering::Greater => {
419                debug!(target: "exex::notifications", "ExEx is ahead of the node head");
420            }
421        };
422
423        Ok(())
424    }
425}
426
427impl<P, E> Stream for ExExNotificationsWithHead<P, E>
428where
429    P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
430    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
431{
432    type Item = eyre::Result<ExExNotification<E::Primitives>>;
433
434    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
435        let this = self.get_mut();
436
437        // 1. Check once whether we need to retrieve a notification gap from the WAL.
438        if this.pending_check_canonical {
439            if let Some(canonical_notification) = this.check_canonical()? {
440                return Poll::Ready(Some(Ok(canonical_notification)))
441            }
442
443            // ExEx head is on the canonical chain, we no longer need to check it
444            this.pending_check_canonical = false;
445        }
446
447        // 2. Check once whether we need to trigger backfill sync
448        if this.pending_check_backfill {
449            this.check_backfill()?;
450            this.pending_check_backfill = false;
451        }
452
453        // 3. If backfill is in progress yield new notifications
454        if let Some(backfill_job) = &mut this.backfill_job {
455            debug!(target: "exex::notifications", "Polling backfill job");
456
457            // Drain the notification channel to prevent backpressure from stalling the
458            // ExExManager. During backfill, the ExEx is not consuming from the channel,
459            // so the capacity-1 channel fills up, which blocks the manager's PollSender,
460            // which fills the manager's 1024-entry buffer, which blocks all upstream
461            // senders. Notifications for blocks covered by the backfill range are
462            // discarded (they'll be re-delivered by the backfill job), while
463            // notifications beyond the backfill range are buffered for delivery after the
464            // backfill completes.
465            while let Poll::Ready(Some(notification)) = this.notifications.poll_recv(cx) {
466                // Always buffer revert-containing notifications (ChainReverted,
467                // ChainReorged) because the backfill job only re-delivers
468                // ChainCommitted from the database. Discarding a reorg here would
469                // leave the ExEx unaware of the fork switch.
470                if notification.reverted_chain().is_some() {
471                    this.pending_notifications.push_back(notification);
472                    continue;
473                }
474                if let Some(committed) = notification.committed_chain() &&
475                    committed.tip().number() <= this.initial_local_head.number
476                {
477                    // Covered by backfill range, safe to discard
478                    continue;
479                }
480                // Beyond the backfill range — buffer for delivery after backfill
481                this.pending_notifications.push_back(notification);
482            }
483
484            if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
485                debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
486                return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
487                    new: Arc::new(chain),
488                })))
489            }
490
491            // Backfill job is done, remove it
492            this.backfill_job = None;
493        }
494
495        // 4. Deliver any notifications that were buffered during backfill
496        if let Some(notification) = this.pending_notifications.pop_front() {
497            return Poll::Ready(Some(Ok(notification)))
498        }
499
500        // 5. Otherwise advance the regular event stream
501        loop {
502            let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
503                return Poll::Ready(None)
504            };
505
506            // 6. In case the exex is ahead of the new tip, we must skip it
507            if let Some(committed) = notification.committed_chain() {
508                // inclusive check because we should start with `exex.head + 1`
509                if this.initial_exex_head.block.number >= committed.tip().number() {
510                    continue
511                }
512            }
513
514            return Poll::Ready(Some(Ok(notification)))
515        }
516    }
517}
518
519#[cfg(test)]
520mod tests {
521    use super::*;
522    use crate::Wal;
523    use alloy_consensus::Header;
524    use alloy_eips::BlockNumHash;
525    use eyre::OptionExt;
526    use futures::StreamExt;
527    use reth_db_common::init::init_genesis;
528    use reth_ethereum_primitives::Block;
529    use reth_evm_ethereum::EthEvmConfig;
530    use reth_primitives_traits::Block as _;
531    use reth_provider::{
532        providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockWriter,
533        Chain, DBProvider, DatabaseProviderFactory,
534    };
535    use reth_testing_utils::generators::{self, random_block, BlockParams};
536    use std::collections::BTreeMap;
537    use tokio::sync::mpsc;
538
539    #[tokio::test]
540    async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
541        let mut rng = generators::rng();
542
543        let temp_dir = tempfile::tempdir().unwrap();
544        let wal = Wal::new(temp_dir.path()).unwrap();
545
546        let provider_factory = create_test_provider_factory();
547        let genesis_hash = init_genesis(&provider_factory)?;
548        let genesis_block = provider_factory
549            .block(genesis_hash.into())?
550            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
551
552        let provider = BlockchainProvider::new(provider_factory.clone())?;
553
554        let node_head_block = random_block(
555            &mut rng,
556            genesis_block.number + 1,
557            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
558        )
559        .try_recover()?;
560        let node_head = node_head_block.num_hash();
561        let provider_rw = provider_factory.provider_rw()?;
562        provider_rw.insert_block(&node_head_block)?;
563        provider_rw.commit()?;
564        let exex_head =
565            ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
566
567        let notification = ExExNotification::ChainCommitted {
568            new: Arc::new(Chain::new(
569                vec![random_block(
570                    &mut rng,
571                    node_head.number + 1,
572                    BlockParams { parent: Some(node_head.hash), ..Default::default() },
573                )
574                .try_recover()?],
575                Default::default(),
576                BTreeMap::new(),
577            )),
578        };
579
580        let (notifications_tx, notifications_rx) = mpsc::channel(1);
581
582        notifications_tx.send(notification.clone()).await?;
583
584        let mut notifications = ExExNotificationsWithoutHead::new(
585            node_head,
586            provider,
587            EthEvmConfig::mainnet(),
588            notifications_rx,
589            wal.handle(),
590        )
591        .with_head(exex_head);
592
593        // First notification is the backfill of missing blocks from the canonical chain
594        assert_eq!(
595            notifications.next().await.transpose()?,
596            Some(ExExNotification::ChainCommitted {
597                new: Arc::new(
598                    BackfillJobFactory::new(
599                        notifications.evm_config.clone(),
600                        notifications.provider.clone()
601                    )
602                    .backfill(1..=1)
603                    .next()
604                    .ok_or_eyre("failed to backfill")??
605                )
606            })
607        );
608
609        // Second notification is the actual notification that we sent before
610        assert_eq!(notifications.next().await.transpose()?, Some(notification));
611
612        Ok(())
613    }
614
615    #[tokio::test]
616    async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
617        let temp_dir = tempfile::tempdir().unwrap();
618        let wal = Wal::new(temp_dir.path()).unwrap();
619
620        let provider_factory = create_test_provider_factory();
621        let genesis_hash = init_genesis(&provider_factory)?;
622        let genesis_block = provider_factory
623            .block(genesis_hash.into())?
624            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
625
626        let provider = BlockchainProvider::new(provider_factory)?;
627
628        let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
629        let exex_head = ExExHead { block: node_head };
630
631        let notification = ExExNotification::ChainCommitted {
632            new: Arc::new(Chain::new(
633                vec![Block {
634                    header: Header {
635                        parent_hash: node_head.hash,
636                        number: node_head.number + 1,
637                        ..Default::default()
638                    },
639                    ..Default::default()
640                }
641                .seal_slow()
642                .try_recover()?],
643                Default::default(),
644                BTreeMap::new(),
645            )),
646        };
647
648        let (notifications_tx, notifications_rx) = mpsc::channel(1);
649
650        notifications_tx.send(notification.clone()).await?;
651
652        let mut notifications = ExExNotificationsWithoutHead::new(
653            node_head,
654            provider,
655            EthEvmConfig::mainnet(),
656            notifications_rx,
657            wal.handle(),
658        )
659        .with_head(exex_head);
660
661        let new_notification = notifications.next().await.transpose()?;
662        assert_eq!(new_notification, Some(notification));
663
664        Ok(())
665    }
666
667    #[tokio::test]
668    async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
669        let mut rng = generators::rng();
670
671        let temp_dir = tempfile::tempdir().unwrap();
672        let wal = Wal::new(temp_dir.path()).unwrap();
673
674        let provider_factory = create_test_provider_factory();
675        let genesis_hash = init_genesis(&provider_factory)?;
676        let genesis_block = provider_factory
677            .block(genesis_hash.into())?
678            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
679
680        let provider = BlockchainProvider::new(provider_factory)?;
681
682        let node_head_block = random_block(
683            &mut rng,
684            genesis_block.number + 1,
685            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
686        )
687        .try_recover()?;
688        let node_head = node_head_block.num_hash();
689        let provider_rw = provider.database_provider_rw()?;
690        provider_rw.insert_block(&node_head_block)?;
691        provider_rw.commit()?;
692        let node_head_notification = ExExNotification::ChainCommitted {
693            new: Arc::new(
694                BackfillJobFactory::new(EthEvmConfig::mainnet(), provider.clone())
695                    .backfill(node_head.number..=node_head.number)
696                    .next()
697                    .ok_or_else(|| eyre::eyre!("failed to backfill"))??,
698            ),
699        };
700
701        let exex_head_block = random_block(
702            &mut rng,
703            genesis_block.number + 1,
704            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
705        );
706        let exex_head = ExExHead { block: exex_head_block.num_hash() };
707        let exex_head_notification = ExExNotification::ChainCommitted {
708            new: Arc::new(Chain::new(
709                vec![exex_head_block.clone().try_recover()?],
710                Default::default(),
711                BTreeMap::new(),
712            )),
713        };
714        wal.commit(&exex_head_notification)?;
715
716        let new_notification = ExExNotification::ChainCommitted {
717            new: Arc::new(Chain::new(
718                vec![random_block(
719                    &mut rng,
720                    node_head.number + 1,
721                    BlockParams { parent: Some(node_head.hash), ..Default::default() },
722                )
723                .try_recover()?],
724                Default::default(),
725                BTreeMap::new(),
726            )),
727        };
728
729        let (notifications_tx, notifications_rx) = mpsc::channel(1);
730
731        notifications_tx.send(new_notification.clone()).await?;
732
733        let mut notifications = ExExNotificationsWithoutHead::new(
734            node_head,
735            provider,
736            EthEvmConfig::mainnet(),
737            notifications_rx,
738            wal.handle(),
739        )
740        .with_head(exex_head);
741
742        // First notification is the revert of the ExEx head block to get back to the canonical
743        // chain
744        assert_eq!(
745            notifications.next().await.transpose()?,
746            Some(exex_head_notification.into_inverted())
747        );
748        // Second notification is the backfilled block from the canonical chain to get back to the
749        // canonical tip
750        assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
751        // Third notification is the actual notification that we sent before
752        assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
753
754        Ok(())
755    }
756
757    #[tokio::test]
758    async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
759        reth_tracing::init_test_tracing();
760        let mut rng = generators::rng();
761
762        let temp_dir = tempfile::tempdir().unwrap();
763        let wal = Wal::new(temp_dir.path()).unwrap();
764
765        let provider_factory = create_test_provider_factory();
766        let genesis_hash = init_genesis(&provider_factory)?;
767        let genesis_block = provider_factory
768            .block(genesis_hash.into())?
769            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
770
771        let provider = BlockchainProvider::new(provider_factory)?;
772
773        let exex_head_block = random_block(
774            &mut rng,
775            genesis_block.number + 1,
776            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
777        );
778        let exex_head_notification = ExExNotification::ChainCommitted {
779            new: Arc::new(Chain::new(
780                vec![exex_head_block.clone().try_recover()?],
781                Default::default(),
782                BTreeMap::new(),
783            )),
784        };
785        wal.commit(&exex_head_notification)?;
786
787        let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
788        let exex_head = ExExHead {
789            block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
790        };
791
792        let new_notification = ExExNotification::ChainCommitted {
793            new: Arc::new(Chain::new(
794                vec![random_block(
795                    &mut rng,
796                    genesis_block.number + 1,
797                    BlockParams { parent: Some(genesis_hash), ..Default::default() },
798                )
799                .try_recover()?],
800                Default::default(),
801                BTreeMap::new(),
802            )),
803        };
804
805        let (notifications_tx, notifications_rx) = mpsc::channel(1);
806
807        notifications_tx.send(new_notification.clone()).await?;
808
809        let mut notifications = ExExNotificationsWithoutHead::new(
810            node_head,
811            provider,
812            EthEvmConfig::mainnet(),
813            notifications_rx,
814            wal.handle(),
815        )
816        .with_head(exex_head);
817
818        // First notification is the revert of the ExEx head block to get back to the canonical
819        // chain
820        assert_eq!(
821            notifications.next().await.transpose()?,
822            Some(exex_head_notification.into_inverted())
823        );
824
825        // Second notification is the actual notification that we sent before
826        assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
827
828        Ok(())
829    }
830
831    /// Regression test for <https://github.com/paradigmxyz/reth/issues/19665>.
832    ///
833    /// During backfill, `poll_next` must drain the notification channel so that
834    /// the upstream `ExExManager` is never blocked by a full channel. Without
835    /// the drain loop the capacity-1 channel stays full for the entire backfill
836    /// duration, which stalls the manager's `PollSender` and eventually blocks
837    /// all upstream senders once the 1024-entry buffer fills up.
838    ///
839    /// The key assertion is the `try_send` after the first `poll_next`: it
840    /// proves the channel was drained during the backfill poll. Without the
841    /// fix this `try_send` fails because the notification is still sitting in
842    /// the channel.
843    #[tokio::test]
844    async fn exex_notifications_backfill_drains_channel() -> eyre::Result<()> {
845        let mut rng = generators::rng();
846
847        let temp_dir = tempfile::tempdir().unwrap();
848        let wal = Wal::new(temp_dir.path()).unwrap();
849
850        let provider_factory = create_test_provider_factory();
851        let genesis_hash = init_genesis(&provider_factory)?;
852        let genesis_block = provider_factory
853            .block(genesis_hash.into())?
854            .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
855
856        let provider = BlockchainProvider::new(provider_factory.clone())?;
857
858        // Insert block 1 into the DB so there's something to backfill
859        let node_head_block = random_block(
860            &mut rng,
861            genesis_block.number + 1,
862            BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
863        )
864        .try_recover()?;
865        let node_head = node_head_block.num_hash();
866        let provider_rw = provider_factory.provider_rw()?;
867        provider_rw.insert_block(&node_head_block)?;
868        provider_rw.commit()?;
869
870        // ExEx head is at genesis — backfill will run for block 1
871        let exex_head =
872            ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
873
874        // Notification for a block AFTER the backfill range (block 2).
875        let post_backfill_notification = ExExNotification::ChainCommitted {
876            new: Arc::new(Chain::new(
877                vec![random_block(
878                    &mut rng,
879                    node_head.number + 1,
880                    BlockParams { parent: Some(node_head.hash), ..Default::default() },
881                )
882                .try_recover()?],
883                Default::default(),
884                BTreeMap::new(),
885            )),
886        };
887
888        // Another notification (block 3) used to probe channel capacity.
889        let probe_notification = ExExNotification::ChainCommitted {
890            new: Arc::new(Chain::new(
891                vec![random_block(
892                    &mut rng,
893                    node_head.number + 2,
894                    BlockParams { parent: None, ..Default::default() },
895                )
896                .try_recover()?],
897                Default::default(),
898                BTreeMap::new(),
899            )),
900        };
901
902        let (notifications_tx, notifications_rx) = mpsc::channel(1);
903
904        // Fill the capacity-1 channel.
905        notifications_tx.send(post_backfill_notification.clone()).await?;
906
907        // Confirm the channel is full — this is the precondition that causes the
908        // stall in production: the ExExManager's PollSender would block here.
909        assert!(
910            notifications_tx.try_send(probe_notification.clone()).is_err(),
911            "channel should be full before backfill poll"
912        );
913
914        let mut notifications = ExExNotificationsWithoutHead::new(
915            node_head,
916            provider,
917            EthEvmConfig::mainnet(),
918            notifications_rx,
919            wal.handle(),
920        )
921        .with_head(exex_head);
922
923        // Poll once — this returns the backfill result for block 1. Crucially,
924        // the drain loop in poll_next runs in this same call, consuming the
925        // notification from the channel and buffering it.
926        let backfill_result = notifications.next().await.transpose()?;
927        assert_eq!(
928            backfill_result,
929            Some(ExExNotification::ChainCommitted {
930                new: Arc::new(
931                    BackfillJobFactory::new(
932                        notifications.evm_config.clone(),
933                        notifications.provider.clone()
934                    )
935                    .backfill(1..=1)
936                    .next()
937                    .ok_or_eyre("failed to backfill")??
938                )
939            })
940        );
941
942        // KEY ASSERTION: the channel was drained during the backfill poll above.
943        // Without the drain loop this try_send fails because the original
944        // notification is still occupying the capacity-1 channel.
945        assert!(
946            notifications_tx.try_send(probe_notification.clone()).is_ok(),
947            "channel should have been drained during backfill poll"
948        );
949
950        // The first buffered notification (block 2) was drained from the channel
951        // during backfill and is delivered now.
952        let buffered = notifications.next().await.transpose()?;
953        assert_eq!(buffered, Some(post_backfill_notification));
954
955        // The probe notification (block 3) that we just sent is delivered next.
956        let probe = notifications.next().await.transpose()?;
957        assert_eq!(probe, Some(probe_notification));
958
959        Ok(())
960    }
961}