1use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use futures::{Stream, StreamExt};
5use reth_ethereum_primitives::EthPrimitives;
6use reth_evm::execute::BlockExecutorProvider;
7use reth_exex_types::ExExHead;
8use reth_node_api::NodePrimitives;
9use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
10use reth_tracing::tracing::debug;
11use std::{
12 fmt::Debug,
13 pin::Pin,
14 sync::Arc,
15 task::{ready, Context, Poll},
16};
17use tokio::sync::mpsc::Receiver;
18
19#[derive(Debug)]
23pub struct ExExNotifications<P, E>
24where
25 E: BlockExecutorProvider,
26{
27 inner: ExExNotificationsInner<P, E>,
28}
29
30pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
34 Stream<Item = eyre::Result<ExExNotification<N>>> + Unpin
35{
36 fn set_without_head(&mut self);
42
43 fn set_with_head(&mut self, exex_head: ExExHead);
50
51 fn without_head(self) -> Self
55 where
56 Self: Sized;
57
58 fn with_head(self, exex_head: ExExHead) -> Self
62 where
63 Self: Sized;
64}
65
66#[derive(Debug)]
67enum ExExNotificationsInner<P, E>
68where
69 E: BlockExecutorProvider,
70{
71 WithoutHead(ExExNotificationsWithoutHead<P, E>),
73 WithHead(Box<ExExNotificationsWithHead<P, E>>),
76 Invalid,
79}
80
81impl<P, E> ExExNotifications<P, E>
82where
83 E: BlockExecutorProvider,
84{
85 pub const fn new(
87 node_head: BlockNumHash,
88 provider: P,
89 executor: E,
90 notifications: Receiver<ExExNotification<E::Primitives>>,
91 wal_handle: WalHandle<E::Primitives>,
92 ) -> Self {
93 Self {
94 inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
95 node_head,
96 provider,
97 executor,
98 notifications,
99 wal_handle,
100 )),
101 }
102 }
103}
104
105impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
106where
107 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
108 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
109 + Clone
110 + Unpin
111 + 'static,
112{
113 fn set_without_head(&mut self) {
114 let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
115 self.inner = ExExNotificationsInner::WithoutHead(match current {
116 ExExNotificationsInner::WithoutHead(notifications) => notifications,
117 ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new(
118 notifications.initial_local_head,
119 notifications.provider,
120 notifications.executor,
121 notifications.notifications,
122 notifications.wal_handle,
123 ),
124 ExExNotificationsInner::Invalid => unreachable!(),
125 });
126 }
127
128 fn set_with_head(&mut self, exex_head: ExExHead) {
129 let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
130 self.inner = ExExNotificationsInner::WithHead(match current {
131 ExExNotificationsInner::WithoutHead(notifications) => {
132 Box::new(notifications.with_head(exex_head))
133 }
134 ExExNotificationsInner::WithHead(notifications) => {
135 Box::new(ExExNotificationsWithHead::new(
136 notifications.initial_local_head,
137 notifications.provider,
138 notifications.executor,
139 notifications.notifications,
140 notifications.wal_handle,
141 exex_head,
142 ))
143 }
144 ExExNotificationsInner::Invalid => unreachable!(),
145 });
146 }
147
148 fn without_head(mut self) -> Self {
149 self.set_without_head();
150 self
151 }
152
153 fn with_head(mut self, exex_head: ExExHead) -> Self {
154 self.set_with_head(exex_head);
155 self
156 }
157}
158
159impl<P, E> Stream for ExExNotifications<P, E>
160where
161 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
162 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
163 + Clone
164 + Unpin
165 + 'static,
166{
167 type Item = eyre::Result<ExExNotification<E::Primitives>>;
168
169 fn poll_next(
170 self: std::pin::Pin<&mut Self>,
171 cx: &mut std::task::Context<'_>,
172 ) -> std::task::Poll<Option<Self::Item>> {
173 match &mut self.get_mut().inner {
174 ExExNotificationsInner::WithoutHead(notifications) => {
175 notifications.poll_next_unpin(cx).map(|result| result.map(Ok))
176 }
177 ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx),
178 ExExNotificationsInner::Invalid => unreachable!(),
179 }
180 }
181}
182
183pub struct ExExNotificationsWithoutHead<P, E>
185where
186 E: BlockExecutorProvider,
187{
188 node_head: BlockNumHash,
189 provider: P,
190 executor: E,
191 notifications: Receiver<ExExNotification<E::Primitives>>,
192 wal_handle: WalHandle<E::Primitives>,
193}
194
195impl<P: Debug, E> Debug for ExExNotificationsWithoutHead<P, E>
196where
197 E: Debug + BlockExecutorProvider,
198{
199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 f.debug_struct("ExExNotifications")
201 .field("provider", &self.provider)
202 .field("executor", &self.executor)
203 .field("notifications", &self.notifications)
204 .finish()
205 }
206}
207
208impl<P, E> ExExNotificationsWithoutHead<P, E>
209where
210 E: BlockExecutorProvider,
211{
212 const fn new(
214 node_head: BlockNumHash,
215 provider: P,
216 executor: E,
217 notifications: Receiver<ExExNotification<E::Primitives>>,
218 wal_handle: WalHandle<E::Primitives>,
219 ) -> Self {
220 Self { node_head, provider, executor, notifications, wal_handle }
221 }
222
223 fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
225 ExExNotificationsWithHead::new(
226 self.node_head,
227 self.provider,
228 self.executor,
229 self.notifications,
230 self.wal_handle,
231 head,
232 )
233 }
234}
235
236impl<P: Unpin, E> Stream for ExExNotificationsWithoutHead<P, E>
237where
238 E: Unpin + BlockExecutorProvider,
239{
240 type Item = ExExNotification<E::Primitives>;
241
242 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
243 self.get_mut().notifications.poll_recv(cx)
244 }
245}
246
247#[derive(Debug)]
256pub struct ExExNotificationsWithHead<P, E>
257where
258 E: BlockExecutorProvider,
259{
260 initial_local_head: BlockNumHash,
262 provider: P,
263 executor: E,
264 notifications: Receiver<ExExNotification<E::Primitives>>,
265 wal_handle: WalHandle<E::Primitives>,
266 initial_exex_head: ExExHead,
268
269 pending_check_canonical: bool,
272 pending_check_backfill: bool,
275 backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
277}
278
279impl<P, E> ExExNotificationsWithHead<P, E>
280where
281 E: BlockExecutorProvider,
282{
283 const fn new(
285 node_head: BlockNumHash,
286 provider: P,
287 executor: E,
288 notifications: Receiver<ExExNotification<E::Primitives>>,
289 wal_handle: WalHandle<E::Primitives>,
290 exex_head: ExExHead,
291 ) -> Self {
292 Self {
293 initial_local_head: node_head,
294 provider,
295 executor,
296 notifications,
297 wal_handle,
298 initial_exex_head: exex_head,
299 pending_check_canonical: true,
300 pending_check_backfill: true,
301 backfill_job: None,
302 }
303 }
304}
305
306impl<P, E> ExExNotificationsWithHead<P, E>
307where
308 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
309 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
310 + Clone
311 + Unpin
312 + 'static,
313{
314 fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
320 if self.provider.is_known(&self.initial_exex_head.block.hash)? &&
321 self.initial_exex_head.block.number <= self.initial_local_head.number
322 {
323 debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
325 return Ok(None)
326 }
327
328 let Some(notification) = self
333 .wal_handle
334 .get_committed_notification_by_block_hash(&self.initial_exex_head.block.hash)?
335 else {
336 if self.initial_exex_head.block.number > self.initial_local_head.number {
338 debug!(target: "exex::notifications", "ExEx head is ahead of the canonical chain");
339 return Ok(None);
340 }
341
342 return Err(eyre::eyre!(
343 "Could not find notification for block hash {:?} in the WAL",
344 self.initial_exex_head.block.hash
345 ))
346 };
347
348 let committed_chain = notification.committed_chain().unwrap();
350 let new_exex_head =
351 (committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into();
352 debug!(target: "exex::notifications", old_exex_head = ?self.initial_exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
353 self.initial_exex_head.block = new_exex_head;
354
355 Ok(Some(notification.into_inverted()))
358 }
359
360 fn check_backfill(&mut self) -> eyre::Result<()> {
371 let backfill_job_factory =
372 BackfillJobFactory::new(self.executor.clone(), self.provider.clone());
373 match self.initial_exex_head.block.number.cmp(&self.initial_local_head.number) {
374 std::cmp::Ordering::Less => {
375 debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
377 let backfill = backfill_job_factory
378 .backfill(
379 self.initial_exex_head.block.number + 1..=self.initial_local_head.number,
380 )
381 .into_stream();
382 self.backfill_job = Some(backfill);
383 }
384 std::cmp::Ordering::Equal => {
385 debug!(target: "exex::notifications", "ExEx is at the node head");
386 }
387 std::cmp::Ordering::Greater => {
388 debug!(target: "exex::notifications", "ExEx is ahead of the node head");
389 }
390 };
391
392 Ok(())
393 }
394}
395
396impl<P, E> Stream for ExExNotificationsWithHead<P, E>
397where
398 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
399 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
400 + Clone
401 + Unpin
402 + 'static,
403{
404 type Item = eyre::Result<ExExNotification<E::Primitives>>;
405
406 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
407 let this = self.get_mut();
408
409 if this.pending_check_canonical {
411 if let Some(canonical_notification) = this.check_canonical()? {
412 return Poll::Ready(Some(Ok(canonical_notification)))
413 }
414
415 this.pending_check_canonical = false;
417 }
418
419 if this.pending_check_backfill {
421 this.check_backfill()?;
422 this.pending_check_backfill = false;
423 }
424
425 if let Some(backfill_job) = &mut this.backfill_job {
427 debug!(target: "exex::notifications", "Polling backfill job");
428 if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
429 debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
430 return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
431 new: Arc::new(chain),
432 })))
433 }
434
435 this.backfill_job = None;
437 }
438
439 loop {
441 let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
442 return Poll::Ready(None)
443 };
444
445 if let Some(committed) = notification.committed_chain() {
447 if this.initial_exex_head.block.number >= committed.tip().number() {
449 continue
450 }
451 }
452
453 return Poll::Ready(Some(Ok(notification)))
454 }
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461 use crate::Wal;
462 use alloy_consensus::Header;
463 use alloy_eips::BlockNumHash;
464 use eyre::OptionExt;
465 use futures::StreamExt;
466 use reth_db_common::init::init_genesis;
467 use reth_ethereum_primitives::Block;
468 use reth_evm_ethereum::execute::EthExecutorProvider;
469 use reth_primitives_traits::Block as _;
470 use reth_provider::{
471 providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockWriter,
472 Chain, DatabaseProviderFactory, StorageLocation,
473 };
474 use reth_testing_utils::generators::{self, random_block, BlockParams};
475 use tokio::sync::mpsc;
476
477 #[tokio::test]
478 async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
479 let mut rng = generators::rng();
480
481 let temp_dir = tempfile::tempdir().unwrap();
482 let wal = Wal::new(temp_dir.path()).unwrap();
483
484 let provider_factory = create_test_provider_factory();
485 let genesis_hash = init_genesis(&provider_factory)?;
486 let genesis_block = provider_factory
487 .block(genesis_hash.into())?
488 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
489
490 let provider = BlockchainProvider::new(provider_factory.clone())?;
491
492 let node_head_block = random_block(
493 &mut rng,
494 genesis_block.number + 1,
495 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
496 );
497 let provider_rw = provider_factory.provider_rw()?;
498 provider_rw
499 .insert_block(node_head_block.clone().try_recover()?, StorageLocation::Database)?;
500 provider_rw.commit()?;
501
502 let node_head = node_head_block.num_hash();
503 let exex_head =
504 ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
505
506 let notification = ExExNotification::ChainCommitted {
507 new: Arc::new(Chain::new(
508 vec![random_block(
509 &mut rng,
510 node_head.number + 1,
511 BlockParams { parent: Some(node_head.hash), ..Default::default() },
512 )
513 .try_recover()?],
514 Default::default(),
515 None,
516 )),
517 };
518
519 let (notifications_tx, notifications_rx) = mpsc::channel(1);
520
521 notifications_tx.send(notification.clone()).await?;
522
523 let mut notifications = ExExNotificationsWithoutHead::new(
524 node_head,
525 provider,
526 EthExecutorProvider::mainnet(),
527 notifications_rx,
528 wal.handle(),
529 )
530 .with_head(exex_head);
531
532 assert_eq!(
534 notifications.next().await.transpose()?,
535 Some(ExExNotification::ChainCommitted {
536 new: Arc::new(
537 BackfillJobFactory::new(
538 notifications.executor.clone(),
539 notifications.provider.clone()
540 )
541 .backfill(1..=1)
542 .next()
543 .ok_or_eyre("failed to backfill")??
544 )
545 })
546 );
547
548 assert_eq!(notifications.next().await.transpose()?, Some(notification));
550
551 Ok(())
552 }
553
554 #[tokio::test]
555 async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
556 let temp_dir = tempfile::tempdir().unwrap();
557 let wal = Wal::new(temp_dir.path()).unwrap();
558
559 let provider_factory = create_test_provider_factory();
560 let genesis_hash = init_genesis(&provider_factory)?;
561 let genesis_block = provider_factory
562 .block(genesis_hash.into())?
563 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
564
565 let provider = BlockchainProvider::new(provider_factory)?;
566
567 let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
568 let exex_head = ExExHead { block: node_head };
569
570 let notification = ExExNotification::ChainCommitted {
571 new: Arc::new(Chain::new(
572 vec![Block {
573 header: Header {
574 parent_hash: node_head.hash,
575 number: node_head.number + 1,
576 ..Default::default()
577 },
578 ..Default::default()
579 }
580 .seal_slow()
581 .try_recover()?],
582 Default::default(),
583 None,
584 )),
585 };
586
587 let (notifications_tx, notifications_rx) = mpsc::channel(1);
588
589 notifications_tx.send(notification.clone()).await?;
590
591 let mut notifications = ExExNotificationsWithoutHead::new(
592 node_head,
593 provider,
594 EthExecutorProvider::mainnet(),
595 notifications_rx,
596 wal.handle(),
597 )
598 .with_head(exex_head);
599
600 let new_notification = notifications.next().await.transpose()?;
601 assert_eq!(new_notification, Some(notification));
602
603 Ok(())
604 }
605
606 #[tokio::test]
607 async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
608 let mut rng = generators::rng();
609
610 let temp_dir = tempfile::tempdir().unwrap();
611 let wal = Wal::new(temp_dir.path()).unwrap();
612
613 let provider_factory = create_test_provider_factory();
614 let genesis_hash = init_genesis(&provider_factory)?;
615 let genesis_block = provider_factory
616 .block(genesis_hash.into())?
617 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
618
619 let provider = BlockchainProvider::new(provider_factory)?;
620
621 let node_head_block = random_block(
622 &mut rng,
623 genesis_block.number + 1,
624 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
625 )
626 .try_recover()?;
627 let node_head = node_head_block.num_hash();
628 let provider_rw = provider.database_provider_rw()?;
629 provider_rw.insert_block(node_head_block, StorageLocation::Database)?;
630 provider_rw.commit()?;
631 let node_head_notification = ExExNotification::ChainCommitted {
632 new: Arc::new(
633 BackfillJobFactory::new(EthExecutorProvider::mainnet(), provider.clone())
634 .backfill(node_head.number..=node_head.number)
635 .next()
636 .ok_or_else(|| eyre::eyre!("failed to backfill"))??,
637 ),
638 };
639
640 let exex_head_block = random_block(
641 &mut rng,
642 genesis_block.number + 1,
643 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
644 );
645 let exex_head = ExExHead { block: exex_head_block.num_hash() };
646 let exex_head_notification = ExExNotification::ChainCommitted {
647 new: Arc::new(Chain::new(
648 vec![exex_head_block.clone().try_recover()?],
649 Default::default(),
650 None,
651 )),
652 };
653 wal.commit(&exex_head_notification)?;
654
655 let new_notification = ExExNotification::ChainCommitted {
656 new: Arc::new(Chain::new(
657 vec![random_block(
658 &mut rng,
659 node_head.number + 1,
660 BlockParams { parent: Some(node_head.hash), ..Default::default() },
661 )
662 .try_recover()?],
663 Default::default(),
664 None,
665 )),
666 };
667
668 let (notifications_tx, notifications_rx) = mpsc::channel(1);
669
670 notifications_tx.send(new_notification.clone()).await?;
671
672 let mut notifications = ExExNotificationsWithoutHead::new(
673 node_head,
674 provider,
675 EthExecutorProvider::mainnet(),
676 notifications_rx,
677 wal.handle(),
678 )
679 .with_head(exex_head);
680
681 assert_eq!(
684 notifications.next().await.transpose()?,
685 Some(exex_head_notification.into_inverted())
686 );
687 assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
690 assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
692
693 Ok(())
694 }
695
696 #[tokio::test]
697 async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
698 reth_tracing::init_test_tracing();
699 let mut rng = generators::rng();
700
701 let temp_dir = tempfile::tempdir().unwrap();
702 let wal = Wal::new(temp_dir.path()).unwrap();
703
704 let provider_factory = create_test_provider_factory();
705 let genesis_hash = init_genesis(&provider_factory)?;
706 let genesis_block = provider_factory
707 .block(genesis_hash.into())?
708 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
709
710 let provider = BlockchainProvider::new(provider_factory)?;
711
712 let exex_head_block = random_block(
713 &mut rng,
714 genesis_block.number + 1,
715 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
716 );
717 let exex_head_notification = ExExNotification::ChainCommitted {
718 new: Arc::new(Chain::new(
719 vec![exex_head_block.clone().try_recover()?],
720 Default::default(),
721 None,
722 )),
723 };
724 wal.commit(&exex_head_notification)?;
725
726 let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
727 let exex_head = ExExHead {
728 block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
729 };
730
731 let new_notification = ExExNotification::ChainCommitted {
732 new: Arc::new(Chain::new(
733 vec![random_block(
734 &mut rng,
735 genesis_block.number + 1,
736 BlockParams { parent: Some(genesis_hash), ..Default::default() },
737 )
738 .try_recover()?],
739 Default::default(),
740 None,
741 )),
742 };
743
744 let (notifications_tx, notifications_rx) = mpsc::channel(1);
745
746 notifications_tx.send(new_notification.clone()).await?;
747
748 let mut notifications = ExExNotificationsWithoutHead::new(
749 node_head,
750 provider,
751 EthExecutorProvider::mainnet(),
752 notifications_rx,
753 wal.handle(),
754 )
755 .with_head(exex_head);
756
757 assert_eq!(
760 notifications.next().await.transpose()?,
761 Some(exex_head_notification.into_inverted())
762 );
763
764 assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
766
767 Ok(())
768 }
769}