1use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use futures::{Stream, StreamExt};
5use reth_ethereum_primitives::EthPrimitives;
6use reth_evm::ConfigureEvm;
7use reth_exex_types::ExExHead;
8use reth_node_api::NodePrimitives;
9use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
10use reth_tracing::tracing::debug;
11use std::{
12 fmt::Debug,
13 pin::Pin,
14 sync::Arc,
15 task::{ready, Context, Poll},
16};
17use tokio::sync::mpsc::Receiver;
18
19#[derive(Debug)]
23pub struct ExExNotifications<P, E>
24where
25 E: ConfigureEvm,
26{
27 inner: ExExNotificationsInner<P, E>,
28}
29
30pub trait ExExNotificationsStream<N: NodePrimitives = EthPrimitives>:
34 Stream<Item = eyre::Result<ExExNotification<N>>> + Unpin
35{
36 fn set_without_head(&mut self);
42
43 fn set_with_head(&mut self, exex_head: ExExHead);
50
51 fn without_head(self) -> Self
55 where
56 Self: Sized;
57
58 fn with_head(self, exex_head: ExExHead) -> Self
62 where
63 Self: Sized;
64}
65
66#[derive(Debug)]
67enum ExExNotificationsInner<P, E>
68where
69 E: ConfigureEvm,
70{
71 WithoutHead(ExExNotificationsWithoutHead<P, E>),
73 WithHead(Box<ExExNotificationsWithHead<P, E>>),
76 Invalid,
79}
80
81impl<P, E> ExExNotifications<P, E>
82where
83 E: ConfigureEvm,
84{
85 pub const fn new(
87 node_head: BlockNumHash,
88 provider: P,
89 evm_config: E,
90 notifications: Receiver<ExExNotification<E::Primitives>>,
91 wal_handle: WalHandle<E::Primitives>,
92 ) -> Self {
93 Self {
94 inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new(
95 node_head,
96 provider,
97 evm_config,
98 notifications,
99 wal_handle,
100 )),
101 }
102 }
103}
104
105impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
106where
107 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
108 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
109{
110 fn set_without_head(&mut self) {
111 let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
112 self.inner = ExExNotificationsInner::WithoutHead(match current {
113 ExExNotificationsInner::WithoutHead(notifications) => notifications,
114 ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new(
115 notifications.initial_local_head,
116 notifications.provider,
117 notifications.evm_config,
118 notifications.notifications,
119 notifications.wal_handle,
120 ),
121 ExExNotificationsInner::Invalid => unreachable!(),
122 });
123 }
124
125 fn set_with_head(&mut self, exex_head: ExExHead) {
126 let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid);
127 self.inner = ExExNotificationsInner::WithHead(match current {
128 ExExNotificationsInner::WithoutHead(notifications) => {
129 Box::new(notifications.with_head(exex_head))
130 }
131 ExExNotificationsInner::WithHead(notifications) => {
132 Box::new(ExExNotificationsWithHead::new(
133 notifications.initial_local_head,
134 notifications.provider,
135 notifications.evm_config,
136 notifications.notifications,
137 notifications.wal_handle,
138 exex_head,
139 ))
140 }
141 ExExNotificationsInner::Invalid => unreachable!(),
142 });
143 }
144
145 fn without_head(mut self) -> Self {
146 self.set_without_head();
147 self
148 }
149
150 fn with_head(mut self, exex_head: ExExHead) -> Self {
151 self.set_with_head(exex_head);
152 self
153 }
154}
155
156impl<P, E> Stream for ExExNotifications<P, E>
157where
158 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
159 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
160{
161 type Item = eyre::Result<ExExNotification<E::Primitives>>;
162
163 fn poll_next(
164 self: std::pin::Pin<&mut Self>,
165 cx: &mut std::task::Context<'_>,
166 ) -> std::task::Poll<Option<Self::Item>> {
167 match &mut self.get_mut().inner {
168 ExExNotificationsInner::WithoutHead(notifications) => {
169 notifications.poll_next_unpin(cx).map(|result| result.map(Ok))
170 }
171 ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx),
172 ExExNotificationsInner::Invalid => unreachable!(),
173 }
174 }
175}
176
177pub struct ExExNotificationsWithoutHead<P, E>
179where
180 E: ConfigureEvm,
181{
182 node_head: BlockNumHash,
183 provider: P,
184 evm_config: E,
185 notifications: Receiver<ExExNotification<E::Primitives>>,
186 wal_handle: WalHandle<E::Primitives>,
187}
188
189impl<P: Debug, E> Debug for ExExNotificationsWithoutHead<P, E>
190where
191 E: ConfigureEvm + Debug,
192{
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 f.debug_struct("ExExNotifications")
195 .field("provider", &self.provider)
196 .field("evm_config", &self.evm_config)
197 .field("notifications", &self.notifications)
198 .finish()
199 }
200}
201
202impl<P, E> ExExNotificationsWithoutHead<P, E>
203where
204 E: ConfigureEvm,
205{
206 const fn new(
208 node_head: BlockNumHash,
209 provider: P,
210 evm_config: E,
211 notifications: Receiver<ExExNotification<E::Primitives>>,
212 wal_handle: WalHandle<E::Primitives>,
213 ) -> Self {
214 Self { node_head, provider, evm_config, notifications, wal_handle }
215 }
216
217 fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead<P, E> {
219 ExExNotificationsWithHead::new(
220 self.node_head,
221 self.provider,
222 self.evm_config,
223 self.notifications,
224 self.wal_handle,
225 head,
226 )
227 }
228}
229
230impl<P: Unpin, E> Stream for ExExNotificationsWithoutHead<P, E>
231where
232 E: ConfigureEvm,
233{
234 type Item = ExExNotification<E::Primitives>;
235
236 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
237 self.get_mut().notifications.poll_recv(cx)
238 }
239}
240
241#[derive(Debug)]
250pub struct ExExNotificationsWithHead<P, E>
251where
252 E: ConfigureEvm,
253{
254 initial_local_head: BlockNumHash,
256 provider: P,
257 evm_config: E,
258 notifications: Receiver<ExExNotification<E::Primitives>>,
259 wal_handle: WalHandle<E::Primitives>,
260 initial_exex_head: ExExHead,
262
263 pending_check_canonical: bool,
266 pending_check_backfill: bool,
269 backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
271}
272
273impl<P, E> ExExNotificationsWithHead<P, E>
274where
275 E: ConfigureEvm,
276{
277 const fn new(
279 node_head: BlockNumHash,
280 provider: P,
281 evm_config: E,
282 notifications: Receiver<ExExNotification<E::Primitives>>,
283 wal_handle: WalHandle<E::Primitives>,
284 exex_head: ExExHead,
285 ) -> Self {
286 Self {
287 initial_local_head: node_head,
288 provider,
289 evm_config,
290 notifications,
291 wal_handle,
292 initial_exex_head: exex_head,
293 pending_check_canonical: true,
294 pending_check_backfill: true,
295 backfill_job: None,
296 }
297 }
298}
299
300impl<P, E> ExExNotificationsWithHead<P, E>
301where
302 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
303 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
304{
305 fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification<E::Primitives>>> {
311 if self.provider.is_known(self.initial_exex_head.block.hash)? &&
312 self.initial_exex_head.block.number <= self.initial_local_head.number
313 {
314 debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
316 return Ok(None)
317 }
318
319 let Some(notification) = self
324 .wal_handle
325 .get_committed_notification_by_block_hash(&self.initial_exex_head.block.hash)?
326 else {
327 if self.initial_exex_head.block.number > self.initial_local_head.number {
329 debug!(target: "exex::notifications", "ExEx head is ahead of the canonical chain");
330 return Ok(None);
331 }
332
333 return Err(eyre::eyre!(
334 "Could not find notification for block hash {:?} in the WAL",
335 self.initial_exex_head.block.hash
336 ))
337 };
338
339 let committed_chain = notification.committed_chain().unwrap();
341 let new_exex_head =
342 (committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into();
343 debug!(target: "exex::notifications", old_exex_head = ?self.initial_exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
344 self.initial_exex_head.block = new_exex_head;
345
346 Ok(Some(notification.into_inverted()))
349 }
350
351 fn check_backfill(&mut self) -> eyre::Result<()> {
362 let backfill_job_factory =
363 BackfillJobFactory::new(self.evm_config.clone(), self.provider.clone());
364 match self.initial_exex_head.block.number.cmp(&self.initial_local_head.number) {
365 std::cmp::Ordering::Less => {
366 debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill");
368 let backfill = backfill_job_factory
369 .backfill(
370 self.initial_exex_head.block.number + 1..=self.initial_local_head.number,
371 )
372 .into_stream();
373 self.backfill_job = Some(backfill);
374 }
375 std::cmp::Ordering::Equal => {
376 debug!(target: "exex::notifications", "ExEx is at the node head");
377 }
378 std::cmp::Ordering::Greater => {
379 debug!(target: "exex::notifications", "ExEx is ahead of the node head");
380 }
381 };
382
383 Ok(())
384 }
385}
386
387impl<P, E> Stream for ExExNotificationsWithHead<P, E>
388where
389 P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
390 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + Clone + Unpin + 'static,
391{
392 type Item = eyre::Result<ExExNotification<E::Primitives>>;
393
394 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
395 let this = self.get_mut();
396
397 if this.pending_check_canonical {
399 if let Some(canonical_notification) = this.check_canonical()? {
400 return Poll::Ready(Some(Ok(canonical_notification)))
401 }
402
403 this.pending_check_canonical = false;
405 }
406
407 if this.pending_check_backfill {
409 this.check_backfill()?;
410 this.pending_check_backfill = false;
411 }
412
413 if let Some(backfill_job) = &mut this.backfill_job {
415 debug!(target: "exex::notifications", "Polling backfill job");
416 if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
417 debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
418 return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
419 new: Arc::new(chain),
420 })))
421 }
422
423 this.backfill_job = None;
425 }
426
427 loop {
429 let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
430 return Poll::Ready(None)
431 };
432
433 if let Some(committed) = notification.committed_chain() {
435 if this.initial_exex_head.block.number >= committed.tip().number() {
437 continue
438 }
439 }
440
441 return Poll::Ready(Some(Ok(notification)))
442 }
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449 use crate::Wal;
450 use alloy_consensus::Header;
451 use alloy_eips::BlockNumHash;
452 use eyre::OptionExt;
453 use futures::StreamExt;
454 use reth_db_common::init::init_genesis;
455 use reth_ethereum_primitives::Block;
456 use reth_evm_ethereum::EthEvmConfig;
457 use reth_primitives_traits::Block as _;
458 use reth_provider::{
459 providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockWriter,
460 Chain, DBProvider, DatabaseProviderFactory,
461 };
462 use reth_testing_utils::generators::{self, random_block, BlockParams};
463 use std::collections::BTreeMap;
464 use tokio::sync::mpsc;
465
466 #[tokio::test]
467 async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> {
468 let mut rng = generators::rng();
469
470 let temp_dir = tempfile::tempdir().unwrap();
471 let wal = Wal::new(temp_dir.path()).unwrap();
472
473 let provider_factory = create_test_provider_factory();
474 let genesis_hash = init_genesis(&provider_factory)?;
475 let genesis_block = provider_factory
476 .block(genesis_hash.into())?
477 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
478
479 let provider = BlockchainProvider::new(provider_factory.clone())?;
480
481 let node_head_block = random_block(
482 &mut rng,
483 genesis_block.number + 1,
484 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
485 )
486 .try_recover()?;
487 let node_head = node_head_block.num_hash();
488 let provider_rw = provider_factory.provider_rw()?;
489 provider_rw.insert_block(&node_head_block)?;
490 provider_rw.commit()?;
491 let exex_head =
492 ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
493
494 let notification = ExExNotification::ChainCommitted {
495 new: Arc::new(Chain::new(
496 vec![random_block(
497 &mut rng,
498 node_head.number + 1,
499 BlockParams { parent: Some(node_head.hash), ..Default::default() },
500 )
501 .try_recover()?],
502 Default::default(),
503 BTreeMap::new(),
504 BTreeMap::new(),
505 )),
506 };
507
508 let (notifications_tx, notifications_rx) = mpsc::channel(1);
509
510 notifications_tx.send(notification.clone()).await?;
511
512 let mut notifications = ExExNotificationsWithoutHead::new(
513 node_head,
514 provider,
515 EthEvmConfig::mainnet(),
516 notifications_rx,
517 wal.handle(),
518 )
519 .with_head(exex_head);
520
521 assert_eq!(
523 notifications.next().await.transpose()?,
524 Some(ExExNotification::ChainCommitted {
525 new: Arc::new(
526 BackfillJobFactory::new(
527 notifications.evm_config.clone(),
528 notifications.provider.clone()
529 )
530 .backfill(1..=1)
531 .next()
532 .ok_or_eyre("failed to backfill")??
533 )
534 })
535 );
536
537 assert_eq!(notifications.next().await.transpose()?, Some(notification));
539
540 Ok(())
541 }
542
543 #[tokio::test]
544 async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
545 let temp_dir = tempfile::tempdir().unwrap();
546 let wal = Wal::new(temp_dir.path()).unwrap();
547
548 let provider_factory = create_test_provider_factory();
549 let genesis_hash = init_genesis(&provider_factory)?;
550 let genesis_block = provider_factory
551 .block(genesis_hash.into())?
552 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
553
554 let provider = BlockchainProvider::new(provider_factory)?;
555
556 let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
557 let exex_head = ExExHead { block: node_head };
558
559 let notification = ExExNotification::ChainCommitted {
560 new: Arc::new(Chain::new(
561 vec![Block {
562 header: Header {
563 parent_hash: node_head.hash,
564 number: node_head.number + 1,
565 ..Default::default()
566 },
567 ..Default::default()
568 }
569 .seal_slow()
570 .try_recover()?],
571 Default::default(),
572 BTreeMap::new(),
573 BTreeMap::new(),
574 )),
575 };
576
577 let (notifications_tx, notifications_rx) = mpsc::channel(1);
578
579 notifications_tx.send(notification.clone()).await?;
580
581 let mut notifications = ExExNotificationsWithoutHead::new(
582 node_head,
583 provider,
584 EthEvmConfig::mainnet(),
585 notifications_rx,
586 wal.handle(),
587 )
588 .with_head(exex_head);
589
590 let new_notification = notifications.next().await.transpose()?;
591 assert_eq!(new_notification, Some(notification));
592
593 Ok(())
594 }
595
596 #[tokio::test]
597 async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
598 let mut rng = generators::rng();
599
600 let temp_dir = tempfile::tempdir().unwrap();
601 let wal = Wal::new(temp_dir.path()).unwrap();
602
603 let provider_factory = create_test_provider_factory();
604 let genesis_hash = init_genesis(&provider_factory)?;
605 let genesis_block = provider_factory
606 .block(genesis_hash.into())?
607 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
608
609 let provider = BlockchainProvider::new(provider_factory)?;
610
611 let node_head_block = random_block(
612 &mut rng,
613 genesis_block.number + 1,
614 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
615 )
616 .try_recover()?;
617 let node_head = node_head_block.num_hash();
618 let provider_rw = provider.database_provider_rw()?;
619 provider_rw.insert_block(&node_head_block)?;
620 provider_rw.commit()?;
621 let node_head_notification = ExExNotification::ChainCommitted {
622 new: Arc::new(
623 BackfillJobFactory::new(EthEvmConfig::mainnet(), provider.clone())
624 .backfill(node_head.number..=node_head.number)
625 .next()
626 .ok_or_else(|| eyre::eyre!("failed to backfill"))??,
627 ),
628 };
629
630 let exex_head_block = random_block(
631 &mut rng,
632 genesis_block.number + 1,
633 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
634 );
635 let exex_head = ExExHead { block: exex_head_block.num_hash() };
636 let exex_head_notification = ExExNotification::ChainCommitted {
637 new: Arc::new(Chain::new(
638 vec![exex_head_block.clone().try_recover()?],
639 Default::default(),
640 BTreeMap::new(),
641 BTreeMap::new(),
642 )),
643 };
644 wal.commit(&exex_head_notification)?;
645
646 let new_notification = ExExNotification::ChainCommitted {
647 new: Arc::new(Chain::new(
648 vec![random_block(
649 &mut rng,
650 node_head.number + 1,
651 BlockParams { parent: Some(node_head.hash), ..Default::default() },
652 )
653 .try_recover()?],
654 Default::default(),
655 BTreeMap::new(),
656 BTreeMap::new(),
657 )),
658 };
659
660 let (notifications_tx, notifications_rx) = mpsc::channel(1);
661
662 notifications_tx.send(new_notification.clone()).await?;
663
664 let mut notifications = ExExNotificationsWithoutHead::new(
665 node_head,
666 provider,
667 EthEvmConfig::mainnet(),
668 notifications_rx,
669 wal.handle(),
670 )
671 .with_head(exex_head);
672
673 assert_eq!(
676 notifications.next().await.transpose()?,
677 Some(exex_head_notification.into_inverted())
678 );
679 assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
682 assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
684
685 Ok(())
686 }
687
688 #[tokio::test]
689 async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
690 reth_tracing::init_test_tracing();
691 let mut rng = generators::rng();
692
693 let temp_dir = tempfile::tempdir().unwrap();
694 let wal = Wal::new(temp_dir.path()).unwrap();
695
696 let provider_factory = create_test_provider_factory();
697 let genesis_hash = init_genesis(&provider_factory)?;
698 let genesis_block = provider_factory
699 .block(genesis_hash.into())?
700 .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
701
702 let provider = BlockchainProvider::new(provider_factory)?;
703
704 let exex_head_block = random_block(
705 &mut rng,
706 genesis_block.number + 1,
707 BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
708 );
709 let exex_head_notification = ExExNotification::ChainCommitted {
710 new: Arc::new(Chain::new(
711 vec![exex_head_block.clone().try_recover()?],
712 Default::default(),
713 BTreeMap::new(),
714 BTreeMap::new(),
715 )),
716 };
717 wal.commit(&exex_head_notification)?;
718
719 let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
720 let exex_head = ExExHead {
721 block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
722 };
723
724 let new_notification = ExExNotification::ChainCommitted {
725 new: Arc::new(Chain::new(
726 vec![random_block(
727 &mut rng,
728 genesis_block.number + 1,
729 BlockParams { parent: Some(genesis_hash), ..Default::default() },
730 )
731 .try_recover()?],
732 Default::default(),
733 BTreeMap::new(),
734 BTreeMap::new(),
735 )),
736 };
737
738 let (notifications_tx, notifications_rx) = mpsc::channel(1);
739
740 notifications_tx.send(new_notification.clone()).await?;
741
742 let mut notifications = ExExNotificationsWithoutHead::new(
743 node_head,
744 provider,
745 EthEvmConfig::mainnet(),
746 notifications_rx,
747 wal.handle(),
748 )
749 .with_head(exex_head);
750
751 assert_eq!(
754 notifications.next().await.transpose()?,
755 Some(exex_head_notification.into_inverted())
756 );
757
758 assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
760
761 Ok(())
762 }
763}