1use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use futures::{Stream, StreamExt};
5use reth_ethereum_primitives::EthPrimitives;
6use reth_evm::ConfigureEvm;
7use reth_exex_types::ExExHead;
8use reth_node_api::NodePrimitives;
9use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
10use reth_stages_api::ExecutionStageThresholds;
11use reth_tracing::tracing::debug;
12use std::{
13 fmt::Debug,
14 pin::Pin,
15 sync::Arc,
16 task::{ready, Context, Poll},
17};
18use tokio::sync::mpsc::Receiver;
19
20#[derive(Debug)]
24pub struct ExExNotifications<P, E>
25where
26 E: ConfigureEvm,
27{
28 inner: ExExNotificationsInner<P, E>,
29}
30
31pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
35 Stream<Item = eyre::Result<ExExNotification<N>>> + Unpin
36{
37 fn set_without_head(&mut self);
43
44 fn set_with_head(&mut self, exex_head: ExExHead);
51
52 fn without_head(self) -> Self
56 where
57 Self: Sized;
58
59 fn with_head(self, exex_head: ExExHead) -> Self
63 where
64 Self: Sized;
65
66 fn set_backfill_thresholds(&mut self, _thresholds: ExecutionStageThresholds) {}
74}
75
76#[derive(Debug)]
77enum ExExNotificationsInner<P, E>
78where
79 E: ConfigureEvm,
80{
81 WithoutHead(ExExNotificationsWithoutHead<P, E>),
83 WithHead(Box<ExExNotificationsWithHead<P, E>>),
86 Invalid,
89}
90
91impl<P, E> ExExNotifications<P, E>
92where
93 E: ConfigureEvm,
94{
95 pub const fn new(
97 node_head: BlockNumHash,
98 provider: P,
99 evm_config: E,
100 notifications: Receiver<ExExNotification<E::Primitives>>,
101 wal_handle: WalHandle<E::Primitives>,
102 ) -> Self {
103 Self {
104 inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
105 node_head,
106 provider,
107 evm_config,
108 notifications,
109 wal_handle,
110 )),
111 }
112 }
113}
114
115impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
116where
117 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
118 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
119{
120 fn set_without_head(&mut self) {
121 let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
122 self.inner = ExExNotificationsInner::WithoutHead(match current {
123 ExExNotificationsInner::WithoutHead(notifications) => notifications,
124 ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new(
125 notifications.initial_local_head,
126 notifications.provider,
127 notifications.evm_config,
128 notifications.notifications,
129 notifications.wal_handle,
130 ),
131 ExExNotificationsInner::Invalid => unreachable!(),
132 });
133 }
134
135 fn set_with_head(&mut self, exex_head: ExExHead) {
136 let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
137 self.inner = ExExNotificationsInner::WithHead(match current {
138 ExExNotificationsInner::WithoutHead(notifications) => {
139 Box::new(notifications.with_head(exex_head))
140 }
141 ExExNotificationsInner::WithHead(notifications) => {
142 Box::new(ExExNotificationsWithHead::new(
143 notifications.initial_local_head,
144 notifications.provider,
145 notifications.evm_config,
146 notifications.notifications,
147 notifications.wal_handle,
148 exex_head,
149 ))
150 }
151 ExExNotificationsInner::Invalid => unreachable!(),
152 });
153 }
154
155 fn without_head(mut self) -> Self {
156 self.set_without_head();
157 self
158 }
159
160 fn with_head(mut self, exex_head: ExExHead) -> Self {
161 self.set_with_head(exex_head);
162 self
163 }
164
165 fn set_backfill_thresholds(&mut self, thresholds: ExecutionStageThresholds) {
166 if let ExExNotificationsInner::WithHead(notifications) = &mut self.inner {
167 notifications.backfill_thresholds = Some(thresholds);
168 }
169 }
170}
171
172impl<P, E> Stream for ExExNotifications<P, E>
173where
174 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
175 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
176{
177 type Item = eyre::Result<ExExNotification<E::Primitives>>;
178
179 fn poll_next(
180 self: std::pin::Pin<&mut Self>,
181 cx: &mut std::task::Context<'_>,
182 ) -> std::task::Poll<Option<Self::Item>> {
183 match &mut self.get_mut().inner {
184 ExExNotificationsInner::WithoutHead(notifications) => {
185 notifications.poll_next_unpin(cx).map(|result| result.map(Ok))
186 }
187 ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx),
188 ExExNotificationsInner::Invalid => unreachable!(),
189 }
190 }
191}
192
193pub struct ExExNotificationsWithoutHead<P, E>
195where
196 E: ConfigureEvm,
197{
198 node_head: BlockNumHash,
199 provider: P,
200 evm_config: E,
201 notifications: Receiver<ExExNotification<E::Primitives>>,
202 wal_handle: WalHandle<E::Primitives>,
203}
204
205impl<P: Debug, E> Debug for ExExNotificationsWithoutHead<P, E>
206where
207 E: ConfigureEvm + Debug,
208{
209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 f.debug_struct("ExExNotifications")
211 .field("provider", &self.provider)
212 .field("evm_config", &self.evm_config)
213 .field("notifications", &self.notifications)
214 .finish()
215 }
216}
217
218impl<P, E> ExExNotificationsWithoutHead<P, E>
219where
220 E: ConfigureEvm,
221{
222 const fn new(
224 node_head: BlockNumHash,
225 provider: P,
226 evm_config: E,
227 notifications: Receiver<ExExNotification<E::Primitives>>,
228 wal_handle: WalHandle<E::Primitives>,
229 ) -> Self {
230 Self { node_head, provider, evm_config, notifications, wal_handle }
231 }
232
233 fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
235 ExExNotificationsWithHead::new(
236 self.node_head,
237 self.provider,
238 self.evm_config,
239 self.notifications,
240 self.wal_handle,
241 head,
242 )
243 }
244}
245
246impl<P: Unpin, E> Stream for ExExNotificationsWithoutHead<P, E>
247where
248 E: ConfigureEvm,
249{
250 type Item = ExExNotification<E::Primitives>;
251
252 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
253 self.get_mut().notifications.poll_recv(cx)
254 }
255}
256
257#[derive(Debug)]
266pub struct ExExNotificationsWithHead<P, E>
267where
268 E: ConfigureEvm,
269{
270 initial_local_head: BlockNumHash,
272 provider: P,
273 evm_config: E,
274 notifications: Receiver<ExExNotification<E::Primitives>>,
275 wal_handle: WalHandle<E::Primitives>,
276 initial_exex_head: ExExHead,
278
279 pending_check_canonical: bool,
282 pending_check_backfill: bool,
285 backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
287 backfill_thresholds: Option<ExecutionStageThresholds>,
289}
290
291impl<P, E> ExExNotificationsWithHead<P, E>
292where
293 E: ConfigureEvm,
294{
295 const fn new(
297 node_head: BlockNumHash,
298 provider: P,
299 evm_config: E,
300 notifications: Receiver<ExExNotification<E::Primitives>>,
301 wal_handle: WalHandle<E::Primitives>,
302 exex_head: ExExHead,
303 ) -> Self {
304 Self {
305 initial_local_head: node_head,
306 provider,
307 evm_config,
308 notifications,
309 wal_handle,
310 initial_exex_head: exex_head,
311 pending_check_canonical: true,
312 pending_check_backfill: true,
313 backfill_job: None,
314 backfill_thresholds: None,
315 }
316 }
317
318 pub const fn with_backfill_thresholds(mut self, thresholds: ExecutionStageThresholds) -> Self {
327 self.backfill_thresholds = Some(thresholds);
328 self
329 }
330}
331
332impl<P, E> ExExNotificationsWithHead<P, E>
333where
334 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
335 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
336{
337 fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
343 if self.provider.is_known(self.initial_exex_head.block.hash)? &&
344 self.initial_exex_head.block.number <= self.initial_local_head.number
345 {
346 debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
348 return Ok(None)
349 }
350
351 let Some(notification) = self
356 .wal_handle
357 .get_committed_notification_by_block_hash(&self.initial_exex_head.block.hash)?
358 else {
359 if self.initial_exex_head.block.number > self.initial_local_head.number {
361 debug!(target: "exex::notifications", "ExEx head is ahead of the canonical chain");
362 return Ok(None);
363 }
364
365 return Err(eyre::eyre!(
366 "Could not find notification for block hash {:?} in the WAL",
367 self.initial_exex_head.block.hash
368 ))
369 };
370
371 let committed_chain = notification.committed_chain().unwrap();
373 let new_exex_head =
374 (committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into();
375 debug!(target: "exex::notifications", old_exex_head = ?self.initial_exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
376 self.initial_exex_head.block = new_exex_head;
377
378 Ok(Some(notification.into_inverted()))
381 }
382
383 fn check_backfill(&mut self) -> eyre::Result<()> {
394 let mut backfill_job_factory =
395 BackfillJobFactory::new(self.evm_config.clone(), self.provider.clone());
396 if let Some(thresholds) = self.backfill_thresholds.clone() {
397 backfill_job_factory = backfill_job_factory.with_thresholds(thresholds);
398 }
399 match self.initial_exex_head.block.number.cmp(&self.initial_local_head.number) {
400 std::cmp::Ordering::Less => {
401 debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
403 let backfill = backfill_job_factory
404 .backfill(
405 self.initial_exex_head.block.number + 1..=self.initial_local_head.number,
406 )
407 .into_stream();
408 self.backfill_job = Some(backfill);
409 }
410 std::cmp::Ordering::Equal => {
411 debug!(target: "exex::notifications", "ExEx is at the node head");
412 }
413 std::cmp::Ordering::Greater => {
414 debug!(target: "exex::notifications", "ExEx is ahead of the node head");
415 }
416 };
417
418 Ok(())
419 }
420}
421
422impl<P, E> Stream for ExExNotificationsWithHead<P, E>
423where
424 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
425 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
426{
427 type Item = eyre::Result<ExExNotification<E::Primitives>>;
428
429 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
430 let this = self.get_mut();
431
432 if this.pending_check_canonical {
434 if let Some(canonical_notification) = this.check_canonical()? {
435 return Poll::Ready(Some(Ok(canonical_notification)))
436 }
437
438 this.pending_check_canonical = false;
440 }
441
442 if this.pending_check_backfill {
444 this.check_backfill()?;
445 this.pending_check_backfill = false;
446 }
447
448 if let Some(backfill_job) = &mut this.backfill_job {
450 debug!(target: "exex::notifications", "Polling backfill job");
451 if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
452 debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
453 return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
454 new: Arc::new(chain),
455 })))
456 }
457
458 this.backfill_job = None;
460 }
461
462 loop {
464 let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
465 return Poll::Ready(None)
466 };
467
468 if let Some(committed) = notification.committed_chain() {
470 if this.initial_exex_head.block.number >= committed.tip().number() {
472 continue
473 }
474 }
475
476 return Poll::Ready(Some(Ok(notification)))
477 }
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484 use crate::Wal;
485 use alloy_consensus::Header;
486 use alloy_eips::BlockNumHash;
487 use eyre::OptionExt;
488 use futures::StreamExt;
489 use reth_db_common::init::init_genesis;
490 use reth_ethereum_primitives::Block;
491 use reth_evm_ethereum::EthEvmConfig;
492 use reth_primitives_traits::Block as _;
493 use reth_provider::{
494 providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockWriter,
495 Chain, DBProvider, DatabaseProviderFactory,
496 };
497 use reth_testing_utils::generators::{self, random_block, BlockParams};
498 use std::collections::BTreeMap;
499 use tokio::sync::mpsc;
500
501 #[tokio::test]
502 async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
503 let mut rng = generators::rng();
504
505 let temp_dir = tempfile::tempdir().unwrap();
506 let wal = Wal::new(temp_dir.path()).unwrap();
507
508 let provider_factory = create_test_provider_factory();
509 let genesis_hash = init_genesis(&provider_factory)?;
510 let genesis_block = provider_factory
511 .block(genesis_hash.into())?
512 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
513
514 let provider = BlockchainProvider::new(provider_factory.clone())?;
515
516 let node_head_block = random_block(
517 &mut rng,
518 genesis_block.number + 1,
519 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
520 )
521 .try_recover()?;
522 let node_head = node_head_block.num_hash();
523 let provider_rw = provider_factory.provider_rw()?;
524 provider_rw.insert_block(&node_head_block)?;
525 provider_rw.commit()?;
526 let exex_head =
527 ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
528
529 let notification = ExExNotification::ChainCommitted {
530 new: Arc::new(Chain::new(
531 vec![random_block(
532 &mut rng,
533 node_head.number + 1,
534 BlockParams { parent: Some(node_head.hash), ..Default::default() },
535 )
536 .try_recover()?],
537 Default::default(),
538 BTreeMap::new(),
539 )),
540 };
541
542 let (notifications_tx, notifications_rx) = mpsc::channel(1);
543
544 notifications_tx.send(notification.clone()).await?;
545
546 let mut notifications = ExExNotificationsWithoutHead::new(
547 node_head,
548 provider,
549 EthEvmConfig::mainnet(),
550 notifications_rx,
551 wal.handle(),
552 )
553 .with_head(exex_head);
554
555 assert_eq!(
557 notifications.next().await.transpose()?,
558 Some(ExExNotification::ChainCommitted {
559 new: Arc::new(
560 BackfillJobFactory::new(
561 notifications.evm_config.clone(),
562 notifications.provider.clone()
563 )
564 .backfill(1..=1)
565 .next()
566 .ok_or_eyre("failed to backfill")??
567 )
568 })
569 );
570
571 assert_eq!(notifications.next().await.transpose()?, Some(notification));
573
574 Ok(())
575 }
576
577 #[tokio::test]
578 async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
579 let temp_dir = tempfile::tempdir().unwrap();
580 let wal = Wal::new(temp_dir.path()).unwrap();
581
582 let provider_factory = create_test_provider_factory();
583 let genesis_hash = init_genesis(&provider_factory)?;
584 let genesis_block = provider_factory
585 .block(genesis_hash.into())?
586 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
587
588 let provider = BlockchainProvider::new(provider_factory)?;
589
590 let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
591 let exex_head = ExExHead { block: node_head };
592
593 let notification = ExExNotification::ChainCommitted {
594 new: Arc::new(Chain::new(
595 vec![Block {
596 header: Header {
597 parent_hash: node_head.hash,
598 number: node_head.number + 1,
599 ..Default::default()
600 },
601 ..Default::default()
602 }
603 .seal_slow()
604 .try_recover()?],
605 Default::default(),
606 BTreeMap::new(),
607 )),
608 };
609
610 let (notifications_tx, notifications_rx) = mpsc::channel(1);
611
612 notifications_tx.send(notification.clone()).await?;
613
614 let mut notifications = ExExNotificationsWithoutHead::new(
615 node_head,
616 provider,
617 EthEvmConfig::mainnet(),
618 notifications_rx,
619 wal.handle(),
620 )
621 .with_head(exex_head);
622
623 let new_notification = notifications.next().await.transpose()?;
624 assert_eq!(new_notification, Some(notification));
625
626 Ok(())
627 }
628
629 #[tokio::test]
630 async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
631 let mut rng = generators::rng();
632
633 let temp_dir = tempfile::tempdir().unwrap();
634 let wal = Wal::new(temp_dir.path()).unwrap();
635
636 let provider_factory = create_test_provider_factory();
637 let genesis_hash = init_genesis(&provider_factory)?;
638 let genesis_block = provider_factory
639 .block(genesis_hash.into())?
640 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
641
642 let provider = BlockchainProvider::new(provider_factory)?;
643
644 let node_head_block = random_block(
645 &mut rng,
646 genesis_block.number + 1,
647 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
648 )
649 .try_recover()?;
650 let node_head = node_head_block.num_hash();
651 let provider_rw = provider.database_provider_rw()?;
652 provider_rw.insert_block(&node_head_block)?;
653 provider_rw.commit()?;
654 let node_head_notification = ExExNotification::ChainCommitted {
655 new: Arc::new(
656 BackfillJobFactory::new(EthEvmConfig::mainnet(), provider.clone())
657 .backfill(node_head.number..=node_head.number)
658 .next()
659 .ok_or_else(|| eyre::eyre!("failed to backfill"))??,
660 ),
661 };
662
663 let exex_head_block = random_block(
664 &mut rng,
665 genesis_block.number + 1,
666 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
667 );
668 let exex_head = ExExHead { block: exex_head_block.num_hash() };
669 let exex_head_notification = ExExNotification::ChainCommitted {
670 new: Arc::new(Chain::new(
671 vec![exex_head_block.clone().try_recover()?],
672 Default::default(),
673 BTreeMap::new(),
674 )),
675 };
676 wal.commit(&exex_head_notification)?;
677
678 let new_notification = ExExNotification::ChainCommitted {
679 new: Arc::new(Chain::new(
680 vec![random_block(
681 &mut rng,
682 node_head.number + 1,
683 BlockParams { parent: Some(node_head.hash), ..Default::default() },
684 )
685 .try_recover()?],
686 Default::default(),
687 BTreeMap::new(),
688 )),
689 };
690
691 let (notifications_tx, notifications_rx) = mpsc::channel(1);
692
693 notifications_tx.send(new_notification.clone()).await?;
694
695 let mut notifications = ExExNotificationsWithoutHead::new(
696 node_head,
697 provider,
698 EthEvmConfig::mainnet(),
699 notifications_rx,
700 wal.handle(),
701 )
702 .with_head(exex_head);
703
704 assert_eq!(
707 notifications.next().await.transpose()?,
708 Some(exex_head_notification.into_inverted())
709 );
710 assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
713 assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
715
716 Ok(())
717 }
718
719 #[tokio::test]
720 async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
721 reth_tracing::init_test_tracing();
722 let mut rng = generators::rng();
723
724 let temp_dir = tempfile::tempdir().unwrap();
725 let wal = Wal::new(temp_dir.path()).unwrap();
726
727 let provider_factory = create_test_provider_factory();
728 let genesis_hash = init_genesis(&provider_factory)?;
729 let genesis_block = provider_factory
730 .block(genesis_hash.into())?
731 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
732
733 let provider = BlockchainProvider::new(provider_factory)?;
734
735 let exex_head_block = random_block(
736 &mut rng,
737 genesis_block.number + 1,
738 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
739 );
740 let exex_head_notification = ExExNotification::ChainCommitted {
741 new: Arc::new(Chain::new(
742 vec![exex_head_block.clone().try_recover()?],
743 Default::default(),
744 BTreeMap::new(),
745 )),
746 };
747 wal.commit(&exex_head_notification)?;
748
749 let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
750 let exex_head = ExExHead {
751 block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
752 };
753
754 let new_notification = ExExNotification::ChainCommitted {
755 new: Arc::new(Chain::new(
756 vec![random_block(
757 &mut rng,
758 genesis_block.number + 1,
759 BlockParams { parent: Some(genesis_hash), ..Default::default() },
760 )
761 .try_recover()?],
762 Default::default(),
763 BTreeMap::new(),
764 )),
765 };
766
767 let (notifications_tx, notifications_rx) = mpsc::channel(1);
768
769 notifications_tx.send(new_notification.clone()).await?;
770
771 let mut notifications = ExExNotificationsWithoutHead::new(
772 node_head,
773 provider,
774 EthEvmConfig::mainnet(),
775 notifications_rx,
776 wal.handle(),
777 )
778 .with_head(exex_head);
779
780 assert_eq!(
783 notifications.next().await.transpose()?,
784 Some(exex_head_notification.into_inverted())
785 );
786
787 assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
789
790 Ok(())
791 }
792}