1use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use futures::{Stream, StreamExt};
5use reth_ethereum_primitives::EthPrimitives;
6use reth_evm::ConfigureEvm;
7use reth_exex_types::ExExHead;
8use reth_node_api::NodePrimitives;
9use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
10use reth_stages_api::ExecutionStageThresholds;
11use reth_tracing::tracing::debug;
12use std::{
13 collections::VecDeque,
14 fmt::Debug,
15 pin::Pin,
16 sync::Arc,
17 task::{ready, Context, Poll},
18};
19use tokio::sync::mpsc::Receiver;
20
21#[derive(Debug)]
25pub struct ExExNotifications<P, E>
26where
27 E: ConfigureEvm,
28{
29 inner: ExExNotificationsInner<P, E>,
30}
31
32pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
36 Stream<Item = eyre::Result<ExExNotification<N>>> + Unpin
37{
38 fn set_without_head(&mut self);
44
45 fn set_with_head(&mut self, exex_head: ExExHead);
52
53 fn without_head(self) -> Self
57 where
58 Self: Sized;
59
60 fn with_head(self, exex_head: ExExHead) -> Self
64 where
65 Self: Sized;
66
67 fn set_backfill_thresholds(&mut self, _thresholds: ExecutionStageThresholds) {}
75}
76
77#[derive(Debug)]
78enum ExExNotificationsInner<P, E>
79where
80 E: ConfigureEvm,
81{
82 WithoutHead(ExExNotificationsWithoutHead<P, E>),
84 WithHead(Box<ExExNotificationsWithHead<P, E>>),
87 Invalid,
90}
91
92impl<P, E> ExExNotifications<P, E>
93where
94 E: ConfigureEvm,
95{
96 pub const fn new(
98 node_head: BlockNumHash,
99 provider: P,
100 evm_config: E,
101 notifications: Receiver<ExExNotification<E::Primitives>>,
102 wal_handle: WalHandle<E::Primitives>,
103 ) -> Self {
104 Self {
105 inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
106 node_head,
107 provider,
108 evm_config,
109 notifications,
110 wal_handle,
111 )),
112 }
113 }
114}
115
116impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
117where
118 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
119 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
120{
121 fn set_without_head(&mut self) {
122 let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
123 self.inner = ExExNotificationsInner::WithoutHead(match current {
124 ExExNotificationsInner::WithoutHead(notifications) => notifications,
125 ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new(
126 notifications.initial_local_head,
127 notifications.provider,
128 notifications.evm_config,
129 notifications.notifications,
130 notifications.wal_handle,
131 ),
132 ExExNotificationsInner::Invalid => unreachable!(),
133 });
134 }
135
136 fn set_with_head(&mut self, exex_head: ExExHead) {
137 let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
138 self.inner = ExExNotificationsInner::WithHead(match current {
139 ExExNotificationsInner::WithoutHead(notifications) => {
140 Box::new(notifications.with_head(exex_head))
141 }
142 ExExNotificationsInner::WithHead(notifications) => {
143 Box::new(ExExNotificationsWithHead::new(
144 notifications.initial_local_head,
145 notifications.provider,
146 notifications.evm_config,
147 notifications.notifications,
148 notifications.wal_handle,
149 exex_head,
150 ))
151 }
152 ExExNotificationsInner::Invalid => unreachable!(),
153 });
154 }
155
156 fn without_head(mut self) -> Self {
157 self.set_without_head();
158 self
159 }
160
161 fn with_head(mut self, exex_head: ExExHead) -> Self {
162 self.set_with_head(exex_head);
163 self
164 }
165
166 fn set_backfill_thresholds(&mut self, thresholds: ExecutionStageThresholds) {
167 if let ExExNotificationsInner::WithHead(notifications) = &mut self.inner {
168 notifications.backfill_thresholds = Some(thresholds);
169 }
170 }
171}
172
173impl<P, E> Stream for ExExNotifications<P, E>
174where
175 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
176 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
177{
178 type Item = eyre::Result<ExExNotification<E::Primitives>>;
179
180 fn poll_next(
181 self: std::pin::Pin<&mut Self>,
182 cx: &mut std::task::Context<'_>,
183 ) -> std::task::Poll<Option<Self::Item>> {
184 match &mut self.get_mut().inner {
185 ExExNotificationsInner::WithoutHead(notifications) => {
186 notifications.poll_next_unpin(cx).map(|result| result.map(Ok))
187 }
188 ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx),
189 ExExNotificationsInner::Invalid => unreachable!(),
190 }
191 }
192}
193
194pub struct ExExNotificationsWithoutHead<P, E>
196where
197 E: ConfigureEvm,
198{
199 node_head: BlockNumHash,
200 provider: P,
201 evm_config: E,
202 notifications: Receiver<ExExNotification<E::Primitives>>,
203 wal_handle: WalHandle<E::Primitives>,
204}
205
206impl<P: Debug, E> Debug for ExExNotificationsWithoutHead<P, E>
207where
208 E: ConfigureEvm + Debug,
209{
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 f.debug_struct("ExExNotifications")
212 .field("provider", &self.provider)
213 .field("evm_config", &self.evm_config)
214 .field("notifications", &self.notifications)
215 .finish()
216 }
217}
218
219impl<P, E> ExExNotificationsWithoutHead<P, E>
220where
221 E: ConfigureEvm,
222{
223 const fn new(
225 node_head: BlockNumHash,
226 provider: P,
227 evm_config: E,
228 notifications: Receiver<ExExNotification<E::Primitives>>,
229 wal_handle: WalHandle<E::Primitives>,
230 ) -> Self {
231 Self { node_head, provider, evm_config, notifications, wal_handle }
232 }
233
234 fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
236 ExExNotificationsWithHead::new(
237 self.node_head,
238 self.provider,
239 self.evm_config,
240 self.notifications,
241 self.wal_handle,
242 head,
243 )
244 }
245}
246
247impl<P: Unpin, E> Stream for ExExNotificationsWithoutHead<P, E>
248where
249 E: ConfigureEvm,
250{
251 type Item = ExExNotification<E::Primitives>;
252
253 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
254 self.get_mut().notifications.poll_recv(cx)
255 }
256}
257
258#[derive(Debug)]
267pub struct ExExNotificationsWithHead<P, E>
268where
269 E: ConfigureEvm,
270{
271 initial_local_head: BlockNumHash,
273 provider: P,
274 evm_config: E,
275 notifications: Receiver<ExExNotification<E::Primitives>>,
276 wal_handle: WalHandle<E::Primitives>,
277 initial_exex_head: ExExHead,
279
280 pending_check_canonical: bool,
283 pending_check_backfill: bool,
286 backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
288 backfill_thresholds: Option<ExecutionStageThresholds>,
290 pending_notifications: VecDeque<ExExNotification<E::Primitives>>,
293}
294
295impl<P, E> ExExNotificationsWithHead<P, E>
296where
297 E: ConfigureEvm,
298{
299 const fn new(
301 node_head: BlockNumHash,
302 provider: P,
303 evm_config: E,
304 notifications: Receiver<ExExNotification<E::Primitives>>,
305 wal_handle: WalHandle<E::Primitives>,
306 exex_head: ExExHead,
307 ) -> Self {
308 Self {
309 initial_local_head: node_head,
310 provider,
311 evm_config,
312 notifications,
313 wal_handle,
314 initial_exex_head: exex_head,
315 pending_check_canonical: true,
316 pending_check_backfill: true,
317 backfill_job: None,
318 backfill_thresholds: None,
319 pending_notifications: VecDeque::new(),
320 }
321 }
322
323 pub const fn with_backfill_thresholds(mut self, thresholds: ExecutionStageThresholds) -> Self {
332 self.backfill_thresholds = Some(thresholds);
333 self
334 }
335}
336
337impl<P, E> ExExNotificationsWithHead<P, E>
338where
339 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
340 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
341{
342 fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
348 if self.provider.is_known(self.initial_exex_head.block.hash)? &&
349 self.initial_exex_head.block.number <= self.initial_local_head.number
350 {
351 debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
353 return Ok(None)
354 }
355
356 let Some(notification) = self
361 .wal_handle
362 .get_committed_notification_by_block_hash(&self.initial_exex_head.block.hash)?
363 else {
364 if self.initial_exex_head.block.number > self.initial_local_head.number {
366 debug!(target: "exex::notifications", "ExEx head is ahead of the canonical chain");
367 return Ok(None);
368 }
369
370 return Err(eyre::eyre!(
371 "Could not find notification for block hash {:?} in the WAL",
372 self.initial_exex_head.block.hash
373 ))
374 };
375
376 let committed_chain = notification.committed_chain().unwrap();
378 let new_exex_head =
379 (committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into();
380 debug!(target: "exex::notifications", old_exex_head = ?self.initial_exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
381 self.initial_exex_head.block = new_exex_head;
382
383 Ok(Some(notification.into_inverted()))
386 }
387
388 fn check_backfill(&mut self) -> eyre::Result<()> {
399 let mut backfill_job_factory =
400 BackfillJobFactory::new(self.evm_config.clone(), self.provider.clone());
401 if let Some(thresholds) = self.backfill_thresholds.clone() {
402 backfill_job_factory = backfill_job_factory.with_thresholds(thresholds);
403 }
404 match self.initial_exex_head.block.number.cmp(&self.initial_local_head.number) {
405 std::cmp::Ordering::Less => {
406 debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
408 let backfill = backfill_job_factory
409 .backfill(
410 self.initial_exex_head.block.number + 1..=self.initial_local_head.number,
411 )
412 .into_stream();
413 self.backfill_job = Some(backfill);
414 }
415 std::cmp::Ordering::Equal => {
416 debug!(target: "exex::notifications", "ExEx is at the node head");
417 }
418 std::cmp::Ordering::Greater => {
419 debug!(target: "exex::notifications", "ExEx is ahead of the node head");
420 }
421 };
422
423 Ok(())
424 }
425}
426
427impl<P, E> Stream for ExExNotificationsWithHead<P, E>
428where
429 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
430 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
431{
432 type Item = eyre::Result<ExExNotification<E::Primitives>>;
433
434 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
435 let this = self.get_mut();
436
437 if this.pending_check_canonical {
439 if let Some(canonical_notification) = this.check_canonical()? {
440 return Poll::Ready(Some(Ok(canonical_notification)))
441 }
442
443 this.pending_check_canonical = false;
445 }
446
447 if this.pending_check_backfill {
449 this.check_backfill()?;
450 this.pending_check_backfill = false;
451 }
452
453 if let Some(backfill_job) = &mut this.backfill_job {
455 debug!(target: "exex::notifications", "Polling backfill job");
456
457 while let Poll::Ready(Some(notification)) = this.notifications.poll_recv(cx) {
466 if notification.reverted_chain().is_some() {
471 this.pending_notifications.push_back(notification);
472 continue;
473 }
474 if let Some(committed) = notification.committed_chain() &&
475 committed.tip().number() <= this.initial_local_head.number
476 {
477 continue;
479 }
480 this.pending_notifications.push_back(notification);
482 }
483
484 if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
485 debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
486 return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
487 new: Arc::new(chain),
488 })))
489 }
490
491 this.backfill_job = None;
493 }
494
495 if let Some(notification) = this.pending_notifications.pop_front() {
497 return Poll::Ready(Some(Ok(notification)))
498 }
499
500 loop {
502 let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
503 return Poll::Ready(None)
504 };
505
506 if let Some(committed) = notification.committed_chain() {
508 if this.initial_exex_head.block.number >= committed.tip().number() {
510 continue
511 }
512 }
513
514 return Poll::Ready(Some(Ok(notification)))
515 }
516 }
517}
518
519#[cfg(test)]
520mod tests {
521 use super::*;
522 use crate::Wal;
523 use alloy_consensus::Header;
524 use alloy_eips::BlockNumHash;
525 use eyre::OptionExt;
526 use futures::StreamExt;
527 use reth_db_common::init::init_genesis;
528 use reth_ethereum_primitives::Block;
529 use reth_evm_ethereum::EthEvmConfig;
530 use reth_primitives_traits::Block as _;
531 use reth_provider::{
532 providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockWriter,
533 Chain, DBProvider, DatabaseProviderFactory,
534 };
535 use reth_testing_utils::generators::{self, random_block, BlockParams};
536 use std::collections::BTreeMap;
537 use tokio::sync::mpsc;
538
539 #[tokio::test]
540 async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
541 let mut rng = generators::rng();
542
543 let temp_dir = tempfile::tempdir().unwrap();
544 let wal = Wal::new(temp_dir.path()).unwrap();
545
546 let provider_factory = create_test_provider_factory();
547 let genesis_hash = init_genesis(&provider_factory)?;
548 let genesis_block = provider_factory
549 .block(genesis_hash.into())?
550 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
551
552 let provider = BlockchainProvider::new(provider_factory.clone())?;
553
554 let node_head_block = random_block(
555 &mut rng,
556 genesis_block.number + 1,
557 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
558 )
559 .try_recover()?;
560 let node_head = node_head_block.num_hash();
561 let provider_rw = provider_factory.provider_rw()?;
562 provider_rw.insert_block(&node_head_block)?;
563 provider_rw.commit()?;
564 let exex_head =
565 ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
566
567 let notification = ExExNotification::ChainCommitted {
568 new: Arc::new(Chain::new(
569 vec![random_block(
570 &mut rng,
571 node_head.number + 1,
572 BlockParams { parent: Some(node_head.hash), ..Default::default() },
573 )
574 .try_recover()?],
575 Default::default(),
576 BTreeMap::new(),
577 )),
578 };
579
580 let (notifications_tx, notifications_rx) = mpsc::channel(1);
581
582 notifications_tx.send(notification.clone()).await?;
583
584 let mut notifications = ExExNotificationsWithoutHead::new(
585 node_head,
586 provider,
587 EthEvmConfig::mainnet(),
588 notifications_rx,
589 wal.handle(),
590 )
591 .with_head(exex_head);
592
593 assert_eq!(
595 notifications.next().await.transpose()?,
596 Some(ExExNotification::ChainCommitted {
597 new: Arc::new(
598 BackfillJobFactory::new(
599 notifications.evm_config.clone(),
600 notifications.provider.clone()
601 )
602 .backfill(1..=1)
603 .next()
604 .ok_or_eyre("failed to backfill")??
605 )
606 })
607 );
608
609 assert_eq!(notifications.next().await.transpose()?, Some(notification));
611
612 Ok(())
613 }
614
615 #[tokio::test]
616 async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
617 let temp_dir = tempfile::tempdir().unwrap();
618 let wal = Wal::new(temp_dir.path()).unwrap();
619
620 let provider_factory = create_test_provider_factory();
621 let genesis_hash = init_genesis(&provider_factory)?;
622 let genesis_block = provider_factory
623 .block(genesis_hash.into())?
624 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
625
626 let provider = BlockchainProvider::new(provider_factory)?;
627
628 let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
629 let exex_head = ExExHead { block: node_head };
630
631 let notification = ExExNotification::ChainCommitted {
632 new: Arc::new(Chain::new(
633 vec![Block {
634 header: Header {
635 parent_hash: node_head.hash,
636 number: node_head.number + 1,
637 ..Default::default()
638 },
639 ..Default::default()
640 }
641 .seal_slow()
642 .try_recover()?],
643 Default::default(),
644 BTreeMap::new(),
645 )),
646 };
647
648 let (notifications_tx, notifications_rx) = mpsc::channel(1);
649
650 notifications_tx.send(notification.clone()).await?;
651
652 let mut notifications = ExExNotificationsWithoutHead::new(
653 node_head,
654 provider,
655 EthEvmConfig::mainnet(),
656 notifications_rx,
657 wal.handle(),
658 )
659 .with_head(exex_head);
660
661 let new_notification = notifications.next().await.transpose()?;
662 assert_eq!(new_notification, Some(notification));
663
664 Ok(())
665 }
666
667 #[tokio::test]
668 async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
669 let mut rng = generators::rng();
670
671 let temp_dir = tempfile::tempdir().unwrap();
672 let wal = Wal::new(temp_dir.path()).unwrap();
673
674 let provider_factory = create_test_provider_factory();
675 let genesis_hash = init_genesis(&provider_factory)?;
676 let genesis_block = provider_factory
677 .block(genesis_hash.into())?
678 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
679
680 let provider = BlockchainProvider::new(provider_factory)?;
681
682 let node_head_block = random_block(
683 &mut rng,
684 genesis_block.number + 1,
685 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
686 )
687 .try_recover()?;
688 let node_head = node_head_block.num_hash();
689 let provider_rw = provider.database_provider_rw()?;
690 provider_rw.insert_block(&node_head_block)?;
691 provider_rw.commit()?;
692 let node_head_notification = ExExNotification::ChainCommitted {
693 new: Arc::new(
694 BackfillJobFactory::new(EthEvmConfig::mainnet(), provider.clone())
695 .backfill(node_head.number..=node_head.number)
696 .next()
697 .ok_or_else(|| eyre::eyre!("failed to backfill"))??,
698 ),
699 };
700
701 let exex_head_block = random_block(
702 &mut rng,
703 genesis_block.number + 1,
704 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
705 );
706 let exex_head = ExExHead { block: exex_head_block.num_hash() };
707 let exex_head_notification = ExExNotification::ChainCommitted {
708 new: Arc::new(Chain::new(
709 vec![exex_head_block.clone().try_recover()?],
710 Default::default(),
711 BTreeMap::new(),
712 )),
713 };
714 wal.commit(&exex_head_notification)?;
715
716 let new_notification = ExExNotification::ChainCommitted {
717 new: Arc::new(Chain::new(
718 vec![random_block(
719 &mut rng,
720 node_head.number + 1,
721 BlockParams { parent: Some(node_head.hash), ..Default::default() },
722 )
723 .try_recover()?],
724 Default::default(),
725 BTreeMap::new(),
726 )),
727 };
728
729 let (notifications_tx, notifications_rx) = mpsc::channel(1);
730
731 notifications_tx.send(new_notification.clone()).await?;
732
733 let mut notifications = ExExNotificationsWithoutHead::new(
734 node_head,
735 provider,
736 EthEvmConfig::mainnet(),
737 notifications_rx,
738 wal.handle(),
739 )
740 .with_head(exex_head);
741
742 assert_eq!(
745 notifications.next().await.transpose()?,
746 Some(exex_head_notification.into_inverted())
747 );
748 assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
751 assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
753
754 Ok(())
755 }
756
757 #[tokio::test]
758 async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
759 reth_tracing::init_test_tracing();
760 let mut rng = generators::rng();
761
762 let temp_dir = tempfile::tempdir().unwrap();
763 let wal = Wal::new(temp_dir.path()).unwrap();
764
765 let provider_factory = create_test_provider_factory();
766 let genesis_hash = init_genesis(&provider_factory)?;
767 let genesis_block = provider_factory
768 .block(genesis_hash.into())?
769 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
770
771 let provider = BlockchainProvider::new(provider_factory)?;
772
773 let exex_head_block = random_block(
774 &mut rng,
775 genesis_block.number + 1,
776 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
777 );
778 let exex_head_notification = ExExNotification::ChainCommitted {
779 new: Arc::new(Chain::new(
780 vec![exex_head_block.clone().try_recover()?],
781 Default::default(),
782 BTreeMap::new(),
783 )),
784 };
785 wal.commit(&exex_head_notification)?;
786
787 let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
788 let exex_head = ExExHead {
789 block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
790 };
791
792 let new_notification = ExExNotification::ChainCommitted {
793 new: Arc::new(Chain::new(
794 vec![random_block(
795 &mut rng,
796 genesis_block.number + 1,
797 BlockParams { parent: Some(genesis_hash), ..Default::default() },
798 )
799 .try_recover()?],
800 Default::default(),
801 BTreeMap::new(),
802 )),
803 };
804
805 let (notifications_tx, notifications_rx) = mpsc::channel(1);
806
807 notifications_tx.send(new_notification.clone()).await?;
808
809 let mut notifications = ExExNotificationsWithoutHead::new(
810 node_head,
811 provider,
812 EthEvmConfig::mainnet(),
813 notifications_rx,
814 wal.handle(),
815 )
816 .with_head(exex_head);
817
818 assert_eq!(
821 notifications.next().await.transpose()?,
822 Some(exex_head_notification.into_inverted())
823 );
824
825 assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
827
828 Ok(())
829 }
830
831 #[tokio::test]
844 async fn exex_notifications_backfill_drains_channel() -> eyre::Result<()> {
845 let mut rng = generators::rng();
846
847 let temp_dir = tempfile::tempdir().unwrap();
848 let wal = Wal::new(temp_dir.path()).unwrap();
849
850 let provider_factory = create_test_provider_factory();
851 let genesis_hash = init_genesis(&provider_factory)?;
852 let genesis_block = provider_factory
853 .block(genesis_hash.into())?
854 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
855
856 let provider = BlockchainProvider::new(provider_factory.clone())?;
857
858 let node_head_block = random_block(
860 &mut rng,
861 genesis_block.number + 1,
862 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
863 )
864 .try_recover()?;
865 let node_head = node_head_block.num_hash();
866 let provider_rw = provider_factory.provider_rw()?;
867 provider_rw.insert_block(&node_head_block)?;
868 provider_rw.commit()?;
869
870 let exex_head =
872 ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
873
874 let post_backfill_notification = ExExNotification::ChainCommitted {
876 new: Arc::new(Chain::new(
877 vec![random_block(
878 &mut rng,
879 node_head.number + 1,
880 BlockParams { parent: Some(node_head.hash), ..Default::default() },
881 )
882 .try_recover()?],
883 Default::default(),
884 BTreeMap::new(),
885 )),
886 };
887
888 let probe_notification = ExExNotification::ChainCommitted {
890 new: Arc::new(Chain::new(
891 vec![random_block(
892 &mut rng,
893 node_head.number + 2,
894 BlockParams { parent: None, ..Default::default() },
895 )
896 .try_recover()?],
897 Default::default(),
898 BTreeMap::new(),
899 )),
900 };
901
902 let (notifications_tx, notifications_rx) = mpsc::channel(1);
903
904 notifications_tx.send(post_backfill_notification.clone()).await?;
906
907 assert!(
910 notifications_tx.try_send(probe_notification.clone()).is_err(),
911 "channel should be full before backfill poll"
912 );
913
914 let mut notifications = ExExNotificationsWithoutHead::new(
915 node_head,
916 provider,
917 EthEvmConfig::mainnet(),
918 notifications_rx,
919 wal.handle(),
920 )
921 .with_head(exex_head);
922
923 let backfill_result = notifications.next().await.transpose()?;
927 assert_eq!(
928 backfill_result,
929 Some(ExExNotification::ChainCommitted {
930 new: Arc::new(
931 BackfillJobFactory::new(
932 notifications.evm_config.clone(),
933 notifications.provider.clone()
934 )
935 .backfill(1..=1)
936 .next()
937 .ok_or_eyre("failed to backfill")??
938 )
939 })
940 );
941
942 assert!(
946 notifications_tx.try_send(probe_notification.clone()).is_ok(),
947 "channel should have been drained during backfill poll"
948 );
949
950 let buffered = notifications.next().await.transpose()?;
953 assert_eq!(buffered, Some(post_backfill_notification));
954
955 let probe = notifications.next().await.transpose()?;
957 assert_eq!(probe, Some(probe_notification));
958
959 Ok(())
960 }
961}