use crate::{
wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
};
use alloy_eips::BlockNumHash;
use futures::StreamExt;
use itertools::Itertools;
use metrics::Gauge;
use reth_chain_state::ForkChoiceStream;
use reth_chainspec::Head;
use reth_metrics::{metrics::Counter, Metrics};
use reth_primitives::SealedHeader;
use reth_provider::HeaderProvider;
use reth_tracing::tracing::{debug, warn};
use std::{
collections::VecDeque,
fmt::Debug,
future::{poll_fn, Future},
ops::Not,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{ready, Context, Poll},
};
use tokio::sync::{
mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender},
watch,
};
use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024;
pub const WAL_BLOCKS_WARNING: usize = 128;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExExNotificationSource {
Pipeline,
BlockchainTree,
}
#[derive(Metrics)]
#[metrics(scope = "exex")]
struct ExExMetrics {
notifications_sent_total: Counter,
events_sent_total: Counter,
}
#[derive(Debug)]
pub struct ExExHandle {
id: String,
metrics: ExExMetrics,
sender: PollSender<ExExNotification>,
receiver: UnboundedReceiver<ExExEvent>,
next_notification_id: usize,
finished_height: Option<BlockNumHash>,
}
impl ExExHandle {
pub fn new<P, E>(
id: String,
node_head: Head,
provider: P,
executor: E,
wal_handle: WalHandle,
) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<P, E>) {
let (notification_tx, notification_rx) = mpsc::channel(1);
let (event_tx, event_rx) = mpsc::unbounded_channel();
let notifications =
ExExNotifications::new(node_head, provider, executor, notification_rx, wal_handle);
(
Self {
id: id.clone(),
metrics: ExExMetrics::new_with_labels(&[("exex", id)]),
sender: PollSender::new(notification_tx),
receiver: event_rx,
next_notification_id: 0,
finished_height: None,
},
event_tx,
notifications,
)
}
fn send(
&mut self,
cx: &mut Context<'_>,
(notification_id, notification): &(usize, ExExNotification),
) -> Poll<Result<(), PollSendError<ExExNotification>>> {
if let Some(finished_height) = self.finished_height {
match notification {
ExExNotification::ChainCommitted { new } => {
if finished_height.number >= new.tip().number {
debug!(
target: "exex::manager",
exex_id = %self.id,
%notification_id,
?finished_height,
new_tip = %new.tip().number,
"Skipping notification"
);
self.next_notification_id = notification_id + 1;
return Poll::Ready(Ok(()))
}
}
ExExNotification::ChainReorged { .. } | ExExNotification::ChainReverted { .. } => {}
}
}
debug!(
target: "exex::manager",
exex_id = %self.id,
%notification_id,
"Reserving slot for notification"
);
match self.sender.poll_reserve(cx) {
Poll::Ready(Ok(())) => (),
other => return other,
}
debug!(
target: "exex::manager",
exex_id = %self.id,
%notification_id,
"Sending notification"
);
match self.sender.send_item(notification.clone()) {
Ok(()) => {
self.next_notification_id = notification_id + 1;
self.metrics.notifications_sent_total.increment(1);
Poll::Ready(Ok(()))
}
Err(err) => Poll::Ready(Err(err)),
}
}
}
#[derive(Metrics)]
#[metrics(scope = "exex.manager")]
pub struct ExExManagerMetrics {
max_capacity: Gauge,
current_capacity: Gauge,
buffer_size: Gauge,
num_exexs: Gauge,
}
#[derive(Debug)]
pub struct ExExManager<P> {
provider: P,
exex_handles: Vec<ExExHandle>,
handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification)>,
min_id: usize,
next_id: usize,
buffer: VecDeque<(usize, ExExNotification)>,
max_capacity: usize,
current_capacity: Arc<AtomicUsize>,
is_ready: watch::Sender<bool>,
finished_height: watch::Sender<FinishedExExHeight>,
wal: Wal,
finalized_header_stream: ForkChoiceStream<SealedHeader>,
handle: ExExManagerHandle,
metrics: ExExManagerMetrics,
}
impl<P> ExExManager<P> {
pub fn new(
provider: P,
handles: Vec<ExExHandle>,
max_capacity: usize,
wal: Wal,
finalized_header_stream: ForkChoiceStream<SealedHeader>,
) -> Self {
let num_exexs = handles.len();
let (handle_tx, handle_rx) = mpsc::unbounded_channel();
let (is_ready_tx, is_ready_rx) = watch::channel(true);
let (finished_height_tx, finished_height_rx) = watch::channel(if num_exexs == 0 {
FinishedExExHeight::NoExExs
} else {
FinishedExExHeight::NotReady
});
let current_capacity = Arc::new(AtomicUsize::new(max_capacity));
let metrics = ExExManagerMetrics::default();
metrics.max_capacity.set(max_capacity as f64);
metrics.num_exexs.set(num_exexs as f64);
Self {
provider,
exex_handles: handles,
handle_rx,
min_id: 0,
next_id: 0,
buffer: VecDeque::with_capacity(max_capacity),
max_capacity,
current_capacity: Arc::clone(¤t_capacity),
is_ready: is_ready_tx,
finished_height: finished_height_tx,
wal,
finalized_header_stream,
handle: ExExManagerHandle {
exex_tx: handle_tx,
num_exexs,
is_ready_receiver: is_ready_rx.clone(),
is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
current_capacity,
finished_height: finished_height_rx,
},
metrics,
}
}
pub fn handle(&self) -> ExExManagerHandle {
self.handle.clone()
}
fn update_capacity(&self) {
let capacity = self.max_capacity.saturating_sub(self.buffer.len());
self.current_capacity.store(capacity, Ordering::Relaxed);
self.metrics.current_capacity.set(capacity as f64);
self.metrics.buffer_size.set(self.buffer.len() as f64);
let _ = self.is_ready.send(capacity > 0);
}
fn push_notification(&mut self, notification: ExExNotification) {
let next_id = self.next_id;
self.buffer.push_back((next_id, notification));
self.next_id += 1;
}
}
impl<P> ExExManager<P>
where
P: HeaderProvider,
{
fn finalize_wal(&self, finalized_header: SealedHeader) -> eyre::Result<()> {
debug!(target: "exex::manager", header = ?finalized_header.num_hash(), "Received finalized header");
let exex_finished_heights = self
.exex_handles
.iter()
.map(|exex_handle| (&exex_handle.id, exex_handle.finished_height))
.unique_by(|(_, num_hash)| num_hash.map(|num_hash| num_hash.hash))
.map(|(exex_id, num_hash)| {
num_hash.map_or(Ok((exex_id, num_hash, false)), |num_hash| {
self.provider
.is_known(&num_hash.hash)
.map(|is_canonical| (exex_id, Some(num_hash), is_canonical))
})
})
.collect::<Result<Vec<_>, _>>()?;
if exex_finished_heights.iter().all(|(_, _, is_canonical)| *is_canonical) {
let lowest_finished_height = exex_finished_heights
.iter()
.copied()
.filter_map(|(_, num_hash, _)| num_hash)
.chain([(finalized_header.num_hash())])
.min_by_key(|num_hash| num_hash.number)
.unwrap();
self.wal.finalize(lowest_finished_height)?;
if self.wal.num_blocks() > WAL_BLOCKS_WARNING {
warn!(
target: "exex::manager",
blocks = ?self.wal.num_blocks(),
"WAL contains too many blocks and is not getting cleared. That will lead to increased disk space usage. Check that you emit the FinishedHeight event from your ExExes."
);
}
} else {
let unfinalized_exexes = exex_finished_heights
.into_iter()
.filter_map(|(exex_id, num_hash, is_canonical)| {
is_canonical.not().then_some((exex_id, num_hash))
})
.format_with(", ", |(exex_id, num_hash), f| {
f(&format_args!("{exex_id} = {num_hash:?}"))
})
.to_string();
debug!(
target: "exex::manager",
%unfinalized_exexes,
"Not all ExExes are on the canonical chain, can't finalize the WAL"
);
}
Ok(())
}
}
impl<P> Future for ExExManager<P>
where
P: HeaderProvider + Unpin + 'static,
{
type Output = eyre::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
for exex in &mut this.exex_handles {
while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
debug!(target: "exex::manager", exex_id = %exex.id, ?event, "Received event from ExEx");
exex.metrics.events_sent_total.increment(1);
match event {
ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),
}
}
}
let mut last_finalized_header = None;
while let Poll::Ready(finalized_header) = this.finalized_header_stream.poll_next_unpin(cx) {
last_finalized_header = finalized_header;
}
if let Some(header) = last_finalized_header {
this.finalize_wal(header)?;
}
while this.buffer.len() < this.max_capacity {
if let Poll::Ready(Some((source, notification))) = this.handle_rx.poll_recv(cx) {
let committed_tip = notification.committed_chain().map(|chain| chain.tip().number);
let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number);
debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Received new notification");
match source {
ExExNotificationSource::BlockchainTree => {
debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Committing notification to WAL");
this.wal.commit(¬ification)?;
}
ExExNotificationSource::Pipeline => {
debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Notification was sent from pipeline, skipping WAL commit");
}
}
this.push_notification(notification);
continue
}
break
}
this.update_capacity();
let mut min_id = usize::MAX;
for idx in (0..this.exex_handles.len()).rev() {
let mut exex = this.exex_handles.swap_remove(idx);
let notification_index = exex
.next_notification_id
.checked_sub(this.min_id)
.expect("exex expected notification ID outside the manager's range");
if let Some(notification) = this.buffer.get(notification_index) {
if let Poll::Ready(Err(err)) = exex.send(cx, notification) {
return Poll::Ready(Err(err.into()))
}
}
min_id = min_id.min(exex.next_notification_id);
this.exex_handles.push(exex);
}
debug!(target: "exex::manager", %min_id, "Updating lowest notification id in buffer");
this.buffer.retain(|&(id, _)| id >= min_id);
this.min_id = min_id;
this.update_capacity();
let finished_height = this.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr)))
});
if let Ok(finished_height) = finished_height {
let _ = this.finished_height.send(FinishedExExHeight::Height(finished_height));
}
Poll::Pending
}
}
#[derive(Debug)]
pub struct ExExManagerHandle {
exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification)>,
num_exexs: usize,
is_ready_receiver: watch::Receiver<bool>,
is_ready: ReusableBoxFuture<'static, watch::Receiver<bool>>,
current_capacity: Arc<AtomicUsize>,
finished_height: watch::Receiver<FinishedExExHeight>,
}
impl ExExManagerHandle {
pub fn empty() -> Self {
let (exex_tx, _) = mpsc::unbounded_channel();
let (_, is_ready_rx) = watch::channel(true);
let (_, finished_height_rx) = watch::channel(FinishedExExHeight::NoExExs);
Self {
exex_tx,
num_exexs: 0,
is_ready_receiver: is_ready_rx.clone(),
is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)),
current_capacity: Arc::new(AtomicUsize::new(0)),
finished_height: finished_height_rx,
}
}
pub fn send(
&self,
source: ExExNotificationSource,
notification: ExExNotification,
) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> {
self.exex_tx.send((source, notification))
}
pub async fn send_async(
&mut self,
source: ExExNotificationSource,
notification: ExExNotification,
) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> {
self.ready().await;
self.exex_tx.send((source, notification))
}
pub fn capacity(&self) -> usize {
self.current_capacity.load(Ordering::Relaxed)
}
pub fn has_capacity(&self) -> bool {
self.capacity() > 0
}
pub const fn has_exexs(&self) -> bool {
self.num_exexs > 0
}
pub fn finished_height(&self) -> watch::Receiver<FinishedExExHeight> {
self.finished_height.clone()
}
pub async fn ready(&mut self) {
poll_fn(|cx| self.poll_ready(cx)).await
}
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
let rx = ready!(self.is_ready.poll(cx));
self.is_ready.set(make_wait_future(rx));
Poll::Ready(())
}
}
async fn make_wait_future(mut rx: watch::Receiver<bool>) -> watch::Receiver<bool> {
let _ = rx.wait_for(|ready| *ready).await;
rx
}
impl Clone for ExExManagerHandle {
fn clone(&self) -> Self {
Self {
exex_tx: self.exex_tx.clone(),
num_exexs: self.num_exexs,
is_ready_receiver: self.is_ready_receiver.clone(),
is_ready: ReusableBoxFuture::new(make_wait_future(self.is_ready_receiver.clone())),
current_capacity: self.current_capacity.clone(),
finished_height: self.finished_height.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::B256;
use futures::{StreamExt, TryStreamExt};
use rand::Rng;
use reth_db_common::init::init_genesis;
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_primitives::SealedBlockWithSenders;
use reth_provider::{
providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader,
BlockWriter, Chain, DatabaseProviderFactory, StorageLocation, TransactionVariant,
};
use reth_testing_utils::generators::{self, random_block, BlockParams};
fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
let (tx, rx) = watch::channel(None);
std::mem::forget(tx);
ForkChoiceStream::new(rx)
}
#[tokio::test]
async fn test_delivers_events() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (mut exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let event = ExExEvent::FinishedHeight(BlockNumHash::new(42, B256::random()));
event_tx.send(event).unwrap();
let received_event = exex_handle.receiver.recv().await.unwrap();
assert_eq!(received_event, event);
}
#[tokio::test]
async fn test_has_exexs() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());
assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
.handle
.has_exexs());
assert!(ExExManager::new((), vec![exex_handle_1], 0, wal, empty_finalized_header_stream())
.handle
.has_exexs());
}
#[tokio::test]
async fn test_has_capacity() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());
assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
.handle
.has_capacity());
assert!(ExExManager::new(
(),
vec![exex_handle_1],
10,
wal,
empty_finalized_header_stream()
)
.handle
.has_capacity());
}
#[test]
fn test_push_notification() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (exex_handle, _, _) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let mut exex_manager =
ExExManager::new((), vec![exex_handle], 10, wal, empty_finalized_header_stream());
let mut block1: SealedBlockWithSenders = Default::default();
block1.block.header.set_hash(B256::new([0x01; 32]));
block1.block.header.set_block_number(10);
let notification1 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
};
exex_manager.push_notification(notification1.clone());
assert_eq!(exex_manager.buffer.len(), 1);
assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
assert_eq!(exex_manager.next_id, 1);
let mut block2: SealedBlockWithSenders = Default::default();
block2.block.header.set_hash(B256::new([0x02; 32]));
block2.block.header.set_block_number(20);
let notification2 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
};
exex_manager.push_notification(notification2.clone());
assert_eq!(exex_manager.buffer.len(), 2);
assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
assert_eq!(exex_manager.buffer.get(1).unwrap().0, 1);
assert_eq!(exex_manager.buffer.get(1).unwrap().1, notification2);
assert_eq!(exex_manager.next_id, 2);
}
#[test]
fn test_update_capacity() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (exex_handle, _, _) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let max_capacity = 5;
let mut exex_manager = ExExManager::new(
(),
vec![exex_handle],
max_capacity,
wal,
empty_finalized_header_stream(),
);
let mut block1: SealedBlockWithSenders = Default::default();
block1.block.header.set_hash(B256::new([0x01; 32]));
block1.block.header.set_block_number(10);
let notification1 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
};
exex_manager.push_notification(notification1.clone());
exex_manager.push_notification(notification1);
exex_manager.update_capacity();
assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity - 2);
exex_manager.buffer.clear();
exex_manager.update_capacity();
assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity);
}
#[tokio::test]
async fn test_updates_block_height() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory();
let (exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
assert!(exex_handle.finished_height.is_none());
let block = BlockNumHash::new(42, B256::random());
event_tx.send(ExExEvent::FinishedHeight(block)).unwrap();
let exex_manager = ExExManager::new(
provider_factory,
vec![exex_handle],
10,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream(),
);
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
let mut pinned_manager = std::pin::pin!(exex_manager);
let _ = pinned_manager.as_mut().poll(&mut cx);
let updated_exex_handle = &pinned_manager.exex_handles[0];
assert_eq!(updated_exex_handle.finished_height, Some(block));
let mut receiver = pinned_manager.handle.finished_height();
receiver.changed().await.unwrap();
let finished_height = *receiver.borrow();
assert_eq!(finished_height, FinishedExExHeight::Height(42));
}
#[tokio::test]
async fn test_updates_block_height_lower() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory();
let (exex_handle1, event_tx1, _) =
ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle());
let (exex_handle2, event_tx2, _) =
ExExHandle::new("test_exex2".to_string(), Head::default(), (), (), wal.handle());
let block1 = BlockNumHash::new(42, B256::random());
let block2 = BlockNumHash::new(10, B256::random());
event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
let exex_manager = ExExManager::new(
provider_factory,
vec![exex_handle1, exex_handle2],
10,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream(),
);
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
let mut pinned_manager = std::pin::pin!(exex_manager);
let _ = pinned_manager.as_mut().poll(&mut cx);
let mut receiver = pinned_manager.handle.finished_height();
receiver.changed().await.unwrap();
let finished_height = *receiver.borrow();
assert_eq!(finished_height, FinishedExExHeight::Height(10));
}
#[tokio::test]
async fn test_updates_block_height_greater() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory();
let (exex_handle1, event_tx1, _) =
ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle());
let (exex_handle2, event_tx2, _) =
ExExHandle::new("test_exex2".to_string(), Head::default(), (), (), wal.handle());
assert!(exex_handle1.finished_height.is_none());
let block1 = BlockNumHash::new(42, B256::random());
let block2 = BlockNumHash::new(100, B256::random());
event_tx1.send(ExExEvent::FinishedHeight(block1)).unwrap();
event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
let exex_manager = ExExManager::new(
provider_factory,
vec![exex_handle1, exex_handle2],
10,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream(),
);
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
let mut pinned_manager = std::pin::pin!(exex_manager);
let _ = pinned_manager.as_mut().poll(&mut cx);
let mut receiver = pinned_manager.handle.finished_height();
receiver.changed().await.unwrap();
let finished_height = *receiver.borrow();
assert_eq!(finished_height, FinishedExExHeight::Height(42));
}
#[tokio::test]
async fn test_exex_manager_capacity() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory();
let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());
let max_capacity = 2;
let mut exex_manager = ExExManager::new(
provider_factory,
vec![exex_handle_1],
max_capacity,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream(),
);
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![Default::default()],
Default::default(),
Default::default(),
)),
};
exex_manager
.handle
.exex_tx
.send((ExExNotificationSource::BlockchainTree, notification.clone()))
.unwrap();
exex_manager
.handle
.exex_tx
.send((ExExNotificationSource::BlockchainTree, notification.clone()))
.unwrap();
exex_manager
.handle
.exex_tx
.send((ExExNotificationSource::BlockchainTree, notification))
.unwrap();
let mut pinned_manager = std::pin::pin!(exex_manager);
assert_eq!(pinned_manager.next_id, 0);
assert_eq!(pinned_manager.buffer.len(), 0);
let _ = pinned_manager.as_mut().poll(&mut cx);
assert_eq!(pinned_manager.next_id, 2);
assert_eq!(pinned_manager.buffer.len(), 2);
}
#[tokio::test]
async fn exex_handle_new() {
let provider_factory = create_test_provider_factory();
init_genesis(&provider_factory).unwrap();
let provider = BlockchainProvider2::new(provider_factory).unwrap();
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (mut exex_handle, _, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Head::default(),
provider,
EthExecutorProvider::mainnet(),
wal.handle(),
);
assert_eq!(exex_handle.id, "test_exex");
assert_eq!(exex_handle.next_notification_id, 0);
let mut block1: SealedBlockWithSenders = Default::default();
block1.block.header.set_hash(B256::new([0x01; 32]));
block1.block.header.set_block_number(10);
let mut block2: SealedBlockWithSenders = Default::default();
block2.block.header.set_hash(B256::new([0x02; 32]));
block2.block.header.set_block_number(11);
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block1.clone(), block2.clone()],
Default::default(),
Default::default(),
)),
};
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notifications.next().await.unwrap().unwrap();
assert_eq!(received_notification, notification);
}
Poll::Pending => panic!("Notification send is pending"),
Poll::Ready(Err(e)) => panic!("Failed to send notification: {:?}", e),
}
assert_eq!(exex_handle.next_notification_id, 23);
}
#[tokio::test]
async fn test_notification_if_finished_height_gt_chain_tip() {
let provider_factory = create_test_provider_factory();
init_genesis(&provider_factory).unwrap();
let provider = BlockchainProvider2::new(provider_factory).unwrap();
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (mut exex_handle, _, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Head::default(),
provider,
EthExecutorProvider::mainnet(),
wal.handle(),
);
exex_handle.finished_height = Some(BlockNumHash::new(15, B256::random()));
let mut block1: SealedBlockWithSenders = Default::default();
block1.block.header.set_hash(B256::new([0x01; 32]));
block1.block.header.set_block_number(10);
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
};
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
match exex_handle.send(&mut cx, &(22, notification)) {
Poll::Ready(Ok(())) => {
poll_fn(|cx| {
assert!(notifications.poll_next_unpin(cx).is_pending());
Poll::Ready(())
})
.await;
}
Poll::Pending | Poll::Ready(Err(_)) => {
panic!("Notification should not be pending or fail");
}
}
assert_eq!(exex_handle.next_notification_id, 23);
}
#[tokio::test]
async fn test_sends_chain_reorged_notification() {
let provider_factory = create_test_provider_factory();
init_genesis(&provider_factory).unwrap();
let provider = BlockchainProvider2::new(provider_factory).unwrap();
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (mut exex_handle, _, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Head::default(),
provider,
EthExecutorProvider::mainnet(),
wal.handle(),
);
let notification = ExExNotification::ChainReorged {
old: Arc::new(Chain::default()),
new: Arc::new(Chain::default()),
};
exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notifications.next().await.unwrap().unwrap();
assert_eq!(received_notification, notification);
}
Poll::Pending | Poll::Ready(Err(_)) => {
panic!("Notification should not be pending or fail")
}
}
assert_eq!(exex_handle.next_notification_id, 23);
}
#[tokio::test]
async fn test_sends_chain_reverted_notification() {
let provider_factory = create_test_provider_factory();
init_genesis(&provider_factory).unwrap();
let provider = BlockchainProvider2::new(provider_factory).unwrap();
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (mut exex_handle, _, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Head::default(),
provider,
EthExecutorProvider::mainnet(),
wal.handle(),
);
let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };
exex_handle.finished_height = Some(BlockNumHash::new(u64::MAX, B256::random()));
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notifications.next().await.unwrap().unwrap();
assert_eq!(received_notification, notification);
}
Poll::Pending | Poll::Ready(Err(_)) => {
panic!("Notification should not be pending or fail")
}
}
assert_eq!(exex_handle.next_notification_id, 23);
}
#[tokio::test]
async fn test_exex_wal() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let mut rng = generators::rng();
let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory).unwrap();
let genesis_block = provider_factory
.sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash)
.unwrap()
.ok_or_else(|| eyre::eyre!("genesis block not found"))?;
let block = random_block(
&mut rng,
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), ..Default::default() },
)
.seal_with_senders()
.unwrap();
let provider_rw = provider_factory.database_provider_rw().unwrap();
provider_rw.insert_block(block.clone(), StorageLocation::Database).unwrap();
provider_rw.commit().unwrap();
let provider = BlockchainProvider2::new(provider_factory).unwrap();
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let (exex_handle, events_tx, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Head::default(),
provider.clone(),
EthExecutorProvider::mainnet(),
wal.handle(),
);
let genesis_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![genesis_block.clone()], Default::default(), None)),
};
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
};
let (finalized_headers_tx, rx) = watch::channel(None);
finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
let finalized_header_stream = ForkChoiceStream::new(rx);
let mut exex_manager = std::pin::pin!(ExExManager::new(
provider,
vec![exex_handle],
2,
wal,
finalized_header_stream
));
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
exex_manager
.handle()
.send(ExExNotificationSource::Pipeline, genesis_notification.clone())?;
exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?;
assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
assert_eq!(
notifications.try_poll_next_unpin(&mut cx)?,
Poll::Ready(Some(genesis_notification))
);
assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
assert_eq!(
notifications.try_poll_next_unpin(&mut cx)?,
Poll::Ready(Some(notification.clone()))
);
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
[notification.clone()]
);
finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
[notification.clone()]
);
events_tx
.send(ExExEvent::FinishedHeight((rng.gen::<u64>(), rng.gen::<B256>()).into()))
.unwrap();
finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
[notification]
);
events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None);
Ok(())
}
}