1use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use futures::{Stream, StreamExt};
5use reth_evm::execute::BlockExecutorProvider;
6use reth_exex_types::ExExHead;
7use reth_node_api::NodePrimitives;
8use reth_primitives::EthPrimitives;
9use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
10use reth_tracing::tracing::debug;
11use std::{
12 fmt::Debug,
13 pin::Pin,
14 sync::Arc,
15 task::{ready, Context, Poll},
16};
17use tokio::sync::mpsc::Receiver;
18
19#[derive(Debug)]
23pub struct ExExNotifications<P, E>
24where
25 E: BlockExecutorProvider,
26{
27 inner: ExExNotificationsInner<P, E>,
28}
29
30pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
34 Stream<Item = eyre::Result<ExExNotification<N>>> + Unpin
35{
36 fn set_without_head(&mut self);
42
43 fn set_with_head(&mut self, exex_head: ExExHead);
50
51 fn without_head(self) -> Self
55 where
56 Self: Sized;
57
58 fn with_head(self, exex_head: ExExHead) -> Self
62 where
63 Self: Sized;
64}
65
66#[derive(Debug)]
67#[allow(clippy::large_enum_variant)]
68enum ExExNotificationsInner<P, E>
69where
70 E: BlockExecutorProvider,
71{
72 WithoutHead(ExExNotificationsWithoutHead<P, E>),
74 WithHead(ExExNotificationsWithHead<P, E>),
77 Invalid,
80}
81
82impl<P, E> ExExNotifications<P, E>
83where
84 E: BlockExecutorProvider,
85{
86 pub const fn new(
88 node_head: BlockNumHash,
89 provider: P,
90 executor: E,
91 notifications: Receiver<ExExNotification<E::Primitives>>,
92 wal_handle: WalHandle<E::Primitives>,
93 ) -> Self {
94 Self {
95 inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
96 node_head,
97 provider,
98 executor,
99 notifications,
100 wal_handle,
101 )),
102 }
103 }
104}
105
106impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
107where
108 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
109 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
110 + Clone
111 + Unpin
112 + 'static,
113{
114 fn set_without_head(&mut self) {
115 let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
116 self.inner = ExExNotificationsInner::WithoutHead(match current {
117 ExExNotificationsInner::WithoutHead(notifications) => notifications,
118 ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new(
119 notifications.initial_local_head,
120 notifications.provider,
121 notifications.executor,
122 notifications.notifications,
123 notifications.wal_handle,
124 ),
125 ExExNotificationsInner::Invalid => unreachable!(),
126 });
127 }
128
129 fn set_with_head(&mut self, exex_head: ExExHead) {
130 let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
131 self.inner = ExExNotificationsInner::WithHead(match current {
132 ExExNotificationsInner::WithoutHead(notifications) => {
133 notifications.with_head(exex_head)
134 }
135 ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithHead::new(
136 notifications.initial_local_head,
137 notifications.provider,
138 notifications.executor,
139 notifications.notifications,
140 notifications.wal_handle,
141 exex_head,
142 ),
143 ExExNotificationsInner::Invalid => unreachable!(),
144 });
145 }
146
147 fn without_head(mut self) -> Self {
148 self.set_without_head();
149 self
150 }
151
152 fn with_head(mut self, exex_head: ExExHead) -> Self {
153 self.set_with_head(exex_head);
154 self
155 }
156}
157
158impl<P, E> Stream for ExExNotifications<P, E>
159where
160 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
161 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
162 + Clone
163 + Unpin
164 + 'static,
165{
166 type Item = eyre::Result<ExExNotification<E::Primitives>>;
167
168 fn poll_next(
169 self: std::pin::Pin<&mut Self>,
170 cx: &mut std::task::Context<'_>,
171 ) -> std::task::Poll<Option<Self::Item>> {
172 match &mut self.get_mut().inner {
173 ExExNotificationsInner::WithoutHead(notifications) => {
174 notifications.poll_next_unpin(cx).map(|result| result.map(Ok))
175 }
176 ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx),
177 ExExNotificationsInner::Invalid => unreachable!(),
178 }
179 }
180}
181
182pub struct ExExNotificationsWithoutHead<P, E>
184where
185 E: BlockExecutorProvider,
186{
187 node_head: BlockNumHash,
188 provider: P,
189 executor: E,
190 notifications: Receiver<ExExNotification<E::Primitives>>,
191 wal_handle: WalHandle<E::Primitives>,
192}
193
194impl<P: Debug, E> Debug for ExExNotificationsWithoutHead<P, E>
195where
196 E: Debug + BlockExecutorProvider,
197{
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 f.debug_struct("ExExNotifications")
200 .field("provider", &self.provider)
201 .field("executor", &self.executor)
202 .field("notifications", &self.notifications)
203 .finish()
204 }
205}
206
207impl<P, E> ExExNotificationsWithoutHead<P, E>
208where
209 E: BlockExecutorProvider,
210{
211 const fn new(
213 node_head: BlockNumHash,
214 provider: P,
215 executor: E,
216 notifications: Receiver<ExExNotification<E::Primitives>>,
217 wal_handle: WalHandle<E::Primitives>,
218 ) -> Self {
219 Self { node_head, provider, executor, notifications, wal_handle }
220 }
221
222 fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
224 ExExNotificationsWithHead::new(
225 self.node_head,
226 self.provider,
227 self.executor,
228 self.notifications,
229 self.wal_handle,
230 head,
231 )
232 }
233}
234
235impl<P: Unpin, E> Stream for ExExNotificationsWithoutHead<P, E>
236where
237 E: Unpin + BlockExecutorProvider,
238{
239 type Item = ExExNotification<E::Primitives>;
240
241 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
242 self.get_mut().notifications.poll_recv(cx)
243 }
244}
245
246#[derive(Debug)]
255pub struct ExExNotificationsWithHead<P, E>
256where
257 E: BlockExecutorProvider,
258{
259 initial_local_head: BlockNumHash,
261 provider: P,
262 executor: E,
263 notifications: Receiver<ExExNotification<E::Primitives>>,
264 wal_handle: WalHandle<E::Primitives>,
265 initial_exex_head: ExExHead,
267
268 pending_check_canonical: bool,
271 pending_check_backfill: bool,
274 backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
276}
277
278impl<P, E> ExExNotificationsWithHead<P, E>
279where
280 E: BlockExecutorProvider,
281{
282 const fn new(
284 node_head: BlockNumHash,
285 provider: P,
286 executor: E,
287 notifications: Receiver<ExExNotification<E::Primitives>>,
288 wal_handle: WalHandle<E::Primitives>,
289 exex_head: ExExHead,
290 ) -> Self {
291 Self {
292 initial_local_head: node_head,
293 provider,
294 executor,
295 notifications,
296 wal_handle,
297 initial_exex_head: exex_head,
298 pending_check_canonical: true,
299 pending_check_backfill: true,
300 backfill_job: None,
301 }
302 }
303}
304
305impl<P, E> ExExNotificationsWithHead<P, E>
306where
307 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
308 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
309 + Clone
310 + Unpin
311 + 'static,
312{
313 fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
319 if self.provider.is_known(&self.initial_exex_head.block.hash)? &&
320 self.initial_exex_head.block.number <= self.initial_local_head.number
321 {
322 debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
324 return Ok(None)
325 }
326
327 let Some(notification) = self
332 .wal_handle
333 .get_committed_notification_by_block_hash(&self.initial_exex_head.block.hash)?
334 else {
335 if self.initial_exex_head.block.number > self.initial_local_head.number {
337 debug!(target: "exex::notifications", "ExEx head is ahead of the canonical chain");
338 return Ok(None);
339 }
340
341 return Err(eyre::eyre!(
342 "Could not find notification for block hash {:?} in the WAL",
343 self.initial_exex_head.block.hash
344 ))
345 };
346
347 let committed_chain = notification.committed_chain().unwrap();
349 let new_exex_head =
350 (committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into();
351 debug!(target: "exex::notifications", old_exex_head = ?self.initial_exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
352 self.initial_exex_head.block = new_exex_head;
353
354 Ok(Some(notification.into_inverted()))
357 }
358
359 fn check_backfill(&mut self) -> eyre::Result<()> {
370 let backfill_job_factory =
371 BackfillJobFactory::new(self.executor.clone(), self.provider.clone());
372 match self.initial_exex_head.block.number.cmp(&self.initial_local_head.number) {
373 std::cmp::Ordering::Less => {
374 debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
376 let backfill = backfill_job_factory
377 .backfill(
378 self.initial_exex_head.block.number + 1..=self.initial_local_head.number,
379 )
380 .into_stream();
381 self.backfill_job = Some(backfill);
382 }
383 std::cmp::Ordering::Equal => {
384 debug!(target: "exex::notifications", "ExEx is at the node head");
385 }
386 std::cmp::Ordering::Greater => {
387 debug!(target: "exex::notifications", "ExEx is ahead of the node head");
388 }
389 };
390
391 Ok(())
392 }
393}
394
395impl<P, E> Stream for ExExNotificationsWithHead<P, E>
396where
397 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
398 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
399 + Clone
400 + Unpin
401 + 'static,
402{
403 type Item = eyre::Result<ExExNotification<E::Primitives>>;
404
405 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
406 let this = self.get_mut();
407
408 if this.pending_check_canonical {
410 if let Some(canonical_notification) = this.check_canonical()? {
411 return Poll::Ready(Some(Ok(canonical_notification)))
412 }
413
414 this.pending_check_canonical = false;
416 }
417
418 if this.pending_check_backfill {
420 this.check_backfill()?;
421 this.pending_check_backfill = false;
422 }
423
424 if let Some(backfill_job) = &mut this.backfill_job {
426 debug!(target: "exex::notifications", "Polling backfill job");
427 if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
428 debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
429 return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
430 new: Arc::new(chain),
431 })))
432 }
433
434 this.backfill_job = None;
436 }
437
438 loop {
440 let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
441 return Poll::Ready(None)
442 };
443
444 if let Some(committed) = notification.committed_chain() {
446 if this.initial_exex_head.block.number >= committed.tip().number() {
448 continue
449 }
450 }
451
452 return Poll::Ready(Some(Ok(notification)))
453 }
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use crate::Wal;
461 use alloy_consensus::Header;
462 use alloy_eips::BlockNumHash;
463 use eyre::OptionExt;
464 use futures::StreamExt;
465 use reth_db_common::init::init_genesis;
466 use reth_evm_ethereum::execute::EthExecutorProvider;
467 use reth_primitives::Block;
468 use reth_primitives_traits::Block as _;
469 use reth_provider::{
470 providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockWriter,
471 Chain, DatabaseProviderFactory, StorageLocation,
472 };
473 use reth_testing_utils::generators::{self, random_block, BlockParams};
474 use tokio::sync::mpsc;
475
476 #[tokio::test]
477 async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
478 let mut rng = generators::rng();
479
480 let temp_dir = tempfile::tempdir().unwrap();
481 let wal = Wal::new(temp_dir.path()).unwrap();
482
483 let provider_factory = create_test_provider_factory();
484 let genesis_hash = init_genesis(&provider_factory)?;
485 let genesis_block = provider_factory
486 .block(genesis_hash.into())?
487 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
488
489 let provider = BlockchainProvider::new(provider_factory.clone())?;
490
491 let node_head_block = random_block(
492 &mut rng,
493 genesis_block.number + 1,
494 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
495 );
496 let provider_rw = provider_factory.provider_rw()?;
497 provider_rw
498 .insert_block(node_head_block.clone().try_recover()?, StorageLocation::Database)?;
499 provider_rw.commit()?;
500
501 let node_head = node_head_block.num_hash();
502 let exex_head =
503 ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
504
505 let notification = ExExNotification::ChainCommitted {
506 new: Arc::new(Chain::new(
507 vec![random_block(
508 &mut rng,
509 node_head.number + 1,
510 BlockParams { parent: Some(node_head.hash), ..Default::default() },
511 )
512 .try_recover()?],
513 Default::default(),
514 None,
515 )),
516 };
517
518 let (notifications_tx, notifications_rx) = mpsc::channel(1);
519
520 notifications_tx.send(notification.clone()).await?;
521
522 let mut notifications = ExExNotificationsWithoutHead::new(
523 node_head,
524 provider,
525 EthExecutorProvider::mainnet(),
526 notifications_rx,
527 wal.handle(),
528 )
529 .with_head(exex_head);
530
531 assert_eq!(
533 notifications.next().await.transpose()?,
534 Some(ExExNotification::ChainCommitted {
535 new: Arc::new(
536 BackfillJobFactory::new(
537 notifications.executor.clone(),
538 notifications.provider.clone()
539 )
540 .backfill(1..=1)
541 .next()
542 .ok_or_eyre("failed to backfill")??
543 )
544 })
545 );
546
547 assert_eq!(notifications.next().await.transpose()?, Some(notification));
549
550 Ok(())
551 }
552
553 #[tokio::test]
554 async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
555 let temp_dir = tempfile::tempdir().unwrap();
556 let wal = Wal::new(temp_dir.path()).unwrap();
557
558 let provider_factory = create_test_provider_factory();
559 let genesis_hash = init_genesis(&provider_factory)?;
560 let genesis_block = provider_factory
561 .block(genesis_hash.into())?
562 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
563
564 let provider = BlockchainProvider::new(provider_factory)?;
565
566 let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
567 let exex_head = ExExHead { block: node_head };
568
569 let notification = ExExNotification::ChainCommitted {
570 new: Arc::new(Chain::new(
571 vec![Block {
572 header: Header {
573 parent_hash: node_head.hash,
574 number: node_head.number + 1,
575 ..Default::default()
576 },
577 ..Default::default()
578 }
579 .seal_slow()
580 .try_recover()?],
581 Default::default(),
582 None,
583 )),
584 };
585
586 let (notifications_tx, notifications_rx) = mpsc::channel(1);
587
588 notifications_tx.send(notification.clone()).await?;
589
590 let mut notifications = ExExNotificationsWithoutHead::new(
591 node_head,
592 provider,
593 EthExecutorProvider::mainnet(),
594 notifications_rx,
595 wal.handle(),
596 )
597 .with_head(exex_head);
598
599 let new_notification = notifications.next().await.transpose()?;
600 assert_eq!(new_notification, Some(notification));
601
602 Ok(())
603 }
604
605 #[tokio::test]
606 async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
607 let mut rng = generators::rng();
608
609 let temp_dir = tempfile::tempdir().unwrap();
610 let wal = Wal::new(temp_dir.path()).unwrap();
611
612 let provider_factory = create_test_provider_factory();
613 let genesis_hash = init_genesis(&provider_factory)?;
614 let genesis_block = provider_factory
615 .block(genesis_hash.into())?
616 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
617
618 let provider = BlockchainProvider::new(provider_factory)?;
619
620 let node_head_block = random_block(
621 &mut rng,
622 genesis_block.number + 1,
623 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
624 )
625 .try_recover()?;
626 let node_head = node_head_block.num_hash();
627 let provider_rw = provider.database_provider_rw()?;
628 provider_rw.insert_block(node_head_block, StorageLocation::Database)?;
629 provider_rw.commit()?;
630 let node_head_notification = ExExNotification::ChainCommitted {
631 new: Arc::new(
632 BackfillJobFactory::new(EthExecutorProvider::mainnet(), provider.clone())
633 .backfill(node_head.number..=node_head.number)
634 .next()
635 .ok_or_else(|| eyre::eyre!("failed to backfill"))??,
636 ),
637 };
638
639 let exex_head_block = random_block(
640 &mut rng,
641 genesis_block.number + 1,
642 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
643 );
644 let exex_head = ExExHead { block: exex_head_block.num_hash() };
645 let exex_head_notification = ExExNotification::ChainCommitted {
646 new: Arc::new(Chain::new(
647 vec![exex_head_block.clone().try_recover()?],
648 Default::default(),
649 None,
650 )),
651 };
652 wal.commit(&exex_head_notification)?;
653
654 let new_notification = ExExNotification::ChainCommitted {
655 new: Arc::new(Chain::new(
656 vec![random_block(
657 &mut rng,
658 node_head.number + 1,
659 BlockParams { parent: Some(node_head.hash), ..Default::default() },
660 )
661 .try_recover()?],
662 Default::default(),
663 None,
664 )),
665 };
666
667 let (notifications_tx, notifications_rx) = mpsc::channel(1);
668
669 notifications_tx.send(new_notification.clone()).await?;
670
671 let mut notifications = ExExNotificationsWithoutHead::new(
672 node_head,
673 provider,
674 EthExecutorProvider::mainnet(),
675 notifications_rx,
676 wal.handle(),
677 )
678 .with_head(exex_head);
679
680 assert_eq!(
683 notifications.next().await.transpose()?,
684 Some(exex_head_notification.into_inverted())
685 );
686 assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
689 assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
691
692 Ok(())
693 }
694
695 #[tokio::test]
696 async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
697 reth_tracing::init_test_tracing();
698 let mut rng = generators::rng();
699
700 let temp_dir = tempfile::tempdir().unwrap();
701 let wal = Wal::new(temp_dir.path()).unwrap();
702
703 let provider_factory = create_test_provider_factory();
704 let genesis_hash = init_genesis(&provider_factory)?;
705 let genesis_block = provider_factory
706 .block(genesis_hash.into())?
707 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
708
709 let provider = BlockchainProvider::new(provider_factory)?;
710
711 let exex_head_block = random_block(
712 &mut rng,
713 genesis_block.number + 1,
714 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
715 );
716 let exex_head_notification = ExExNotification::ChainCommitted {
717 new: Arc::new(Chain::new(
718 vec![exex_head_block.clone().try_recover()?],
719 Default::default(),
720 None,
721 )),
722 };
723 wal.commit(&exex_head_notification)?;
724
725 let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
726 let exex_head = ExExHead {
727 block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
728 };
729
730 let new_notification = ExExNotification::ChainCommitted {
731 new: Arc::new(Chain::new(
732 vec![random_block(
733 &mut rng,
734 genesis_block.number + 1,
735 BlockParams { parent: Some(genesis_hash), ..Default::default() },
736 )
737 .try_recover()?],
738 Default::default(),
739 None,
740 )),
741 };
742
743 let (notifications_tx, notifications_rx) = mpsc::channel(1);
744
745 notifications_tx.send(new_notification.clone()).await?;
746
747 let mut notifications = ExExNotificationsWithoutHead::new(
748 node_head,
749 provider,
750 EthExecutorProvider::mainnet(),
751 notifications_rx,
752 wal.handle(),
753 )
754 .with_head(exex_head);
755
756 assert_eq!(
759 notifications.next().await.transpose()?,
760 Some(exex_head_notification.into_inverted())
761 );
762
763 assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
765
766 Ok(())
767 }
768}