use crate::{
engine::metrics::EngineSyncMetrics, BeaconConsensusEngineEvent,
ConsensusEngineLiveSyncProgress, EthBeaconConsensus,
};
use alloy_consensus::Header;
use alloy_primitives::{BlockNumber, B256};
use futures::FutureExt;
use reth_network_p2p::{
full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
BlockClient,
};
use reth_node_types::{BodyTy, HeaderTy};
use reth_primitives::{BlockBody, EthPrimitives, NodePrimitives, SealedBlock};
use reth_provider::providers::ProviderNodeTypes;
use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
use reth_tasks::TaskSpawner;
use reth_tokio_util::EventSender;
use std::{
cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap},
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
use tracing::trace;
pub(crate) struct EngineSyncController<N, Client>
where
N: ProviderNodeTypes,
Client: BlockClient,
{
full_block_client: FullBlockClient<Client>,
pipeline_task_spawner: Box<dyn TaskSpawner>,
pipeline_state: PipelineState<N>,
pending_pipeline_target: Option<PipelineTarget>,
inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
event_sender: EventSender<BeaconConsensusEngineEvent<N::Primitives>>,
range_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlock<HeaderTy<N>, BodyTy<N>>>>,
max_block: Option<BlockNumber>,
metrics: EngineSyncMetrics,
}
impl<N, Client> EngineSyncController<N, Client>
where
N: ProviderNodeTypes,
Client: BlockClient,
{
pub(crate) fn new(
pipeline: Pipeline<N>,
client: Client,
pipeline_task_spawner: Box<dyn TaskSpawner>,
max_block: Option<BlockNumber>,
chain_spec: Arc<N::ChainSpec>,
event_sender: EventSender<BeaconConsensusEngineEvent<N::Primitives>>,
) -> Self {
Self {
full_block_client: FullBlockClient::new(
client,
Arc::new(EthBeaconConsensus::new(chain_spec)),
),
pipeline_task_spawner,
pipeline_state: PipelineState::Idle(Some(pipeline)),
pending_pipeline_target: None,
inflight_full_block_requests: Vec::new(),
inflight_block_range_requests: Vec::new(),
range_buffered_blocks: BinaryHeap::new(),
event_sender,
max_block,
metrics: EngineSyncMetrics::default(),
}
}
}
impl<N, Client> EngineSyncController<N, Client>
where
N: ProviderNodeTypes,
Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
{
fn update_block_download_metrics(&self) {
self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64);
}
#[cfg(test)]
pub(crate) fn set_max_block(&mut self, block: BlockNumber) {
self.max_block = Some(block);
}
pub(crate) fn clear_block_download_requests(&mut self) {
self.inflight_full_block_requests.clear();
self.inflight_block_range_requests.clear();
self.range_buffered_blocks.clear();
self.update_block_download_metrics();
}
pub(crate) fn cancel_full_block_request(&mut self, hash: B256) {
self.inflight_full_block_requests.retain(|req| *req.hash() != hash);
self.update_block_download_metrics();
}
#[allow(dead_code)]
pub(crate) const fn is_pipeline_sync_pending(&self) -> bool {
self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle()
}
pub(crate) const fn is_pipeline_idle(&self) -> bool {
self.pipeline_state.is_idle()
}
pub(crate) const fn is_pipeline_active(&self) -> bool {
!self.is_pipeline_idle()
}
pub(crate) fn is_inflight_request(&self, hash: B256) -> bool {
self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash)
}
pub(crate) fn download_block_range(&mut self, hash: B256, count: u64) {
if count == 1 {
self.download_full_block(hash);
} else {
trace!(
target: "consensus::engine",
?hash,
?count,
"start downloading full block range."
);
self.event_sender.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks: count,
target: hash,
},
));
let request = self.full_block_client.get_full_block_range(hash, count);
self.inflight_block_range_requests.push(request);
}
}
pub(crate) fn download_full_block(&mut self, hash: B256) -> bool {
if self.is_inflight_request(hash) {
return false
}
trace!(
target: "consensus::engine::sync",
?hash,
"Start downloading full block"
);
self.event_sender.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks: 1,
target: hash,
},
));
let request = self.full_block_client.get_full_block(hash);
self.inflight_full_block_requests.push(request);
self.update_block_download_metrics();
true
}
pub(crate) fn set_pipeline_sync_target(&mut self, target: PipelineTarget) {
if target.sync_target().is_some_and(|target| target.is_zero()) {
trace!(
target: "consensus::engine::sync",
"Pipeline target cannot be zero hash."
);
return
}
self.pending_pipeline_target = Some(target);
}
pub(crate) fn has_reached_max_block(&self, progress: BlockNumber) -> bool {
let has_reached_max_block = self.max_block.is_some_and(|target| progress >= target);
if has_reached_max_block {
trace!(
target: "consensus::engine::sync",
?progress,
max_block = ?self.max_block,
"Consensus engine reached max block"
);
}
has_reached_max_block
}
fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent<N::Primitives>> {
let res = match self.pipeline_state {
PipelineState::Idle(_) => return Poll::Pending,
PipelineState::Running(ref mut fut) => {
ready!(fut.poll_unpin(cx))
}
};
let ev = match res {
Ok((pipeline, result)) => {
let minimum_block_number = pipeline.minimum_block_number();
let reached_max_block =
self.has_reached_max_block(minimum_block_number.unwrap_or_default());
self.pipeline_state = PipelineState::Idle(Some(pipeline));
EngineSyncEvent::PipelineFinished { result, reached_max_block }
}
Err(_) => {
EngineSyncEvent::PipelineTaskDropped
}
};
Poll::Ready(ev)
}
fn try_spawn_pipeline(&mut self) -> Option<EngineSyncEvent<N::Primitives>> {
match &mut self.pipeline_state {
PipelineState::Idle(pipeline) => {
let target = self.pending_pipeline_target.take()?;
let (tx, rx) = oneshot::channel();
let pipeline = pipeline.take().expect("exists");
self.pipeline_task_spawner.spawn_critical_blocking(
"pipeline task",
Box::pin(async move {
let result = pipeline.run_as_fut(Some(target)).await;
let _ = tx.send(result);
}),
);
self.pipeline_state = PipelineState::Running(rx);
self.clear_block_download_requests();
Some(EngineSyncEvent::PipelineStarted(Some(target)))
}
PipelineState::Running(_) => None,
}
}
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent<N::Primitives>> {
if let Some(event) = self.try_spawn_pipeline() {
return Poll::Ready(event)
}
if !self.is_pipeline_idle() {
if let Poll::Ready(event) = self.poll_pipeline(cx) {
return Poll::Ready(event)
}
}
for idx in (0..self.inflight_full_block_requests.len()).rev() {
let mut request = self.inflight_full_block_requests.swap_remove(idx);
if let Poll::Ready(block) = request.poll_unpin(cx) {
trace!(target: "consensus::engine", block=?block.num_hash(), "Received single full block, buffering");
self.range_buffered_blocks.push(Reverse(OrderedSealedBlock(block)));
} else {
self.inflight_full_block_requests.push(request);
}
}
for idx in (0..self.inflight_block_range_requests.len()).rev() {
let mut request = self.inflight_block_range_requests.swap_remove(idx);
if let Poll::Ready(blocks) = request.poll_unpin(cx) {
trace!(target: "consensus::engine", len=?blocks.len(), first=?blocks.first().map(|b| b.num_hash()), last=?blocks.last().map(|b| b.num_hash()), "Received full block range, buffering");
self.range_buffered_blocks
.extend(blocks.into_iter().map(OrderedSealedBlock).map(Reverse));
} else {
self.inflight_block_range_requests.push(request);
}
}
self.update_block_download_metrics();
if let Some(block) = self.range_buffered_blocks.pop() {
while let Some(peek) = self.range_buffered_blocks.peek_mut() {
if peek.0 .0.hash() == block.0 .0.hash() {
PeekMut::pop(peek);
} else {
break
}
}
return Poll::Ready(EngineSyncEvent::FetchedFullBlock(block.0 .0))
}
Poll::Pending
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct OrderedSealedBlock<H = Header, B = BlockBody>(SealedBlock<H, B>);
impl<H, B> PartialOrd for OrderedSealedBlock<H, B>
where
H: reth_primitives_traits::BlockHeader + 'static,
B: reth_primitives_traits::BlockBody + 'static,
{
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<H, B> Ord for OrderedSealedBlock<H, B>
where
H: reth_primitives_traits::BlockHeader + 'static,
B: reth_primitives_traits::BlockBody + 'static,
{
fn cmp(&self, other: &Self) -> Ordering {
self.0.number().cmp(&other.0.number())
}
}
#[derive(Debug)]
pub(crate) enum EngineSyncEvent<N: NodePrimitives = EthPrimitives> {
FetchedFullBlock(SealedBlock<N::BlockHeader, N::BlockBody>),
PipelineStarted(Option<PipelineTarget>),
PipelineFinished {
result: Result<ControlFlow, PipelineError>,
reached_max_block: bool,
},
PipelineTaskDropped,
}
enum PipelineState<N: ProviderNodeTypes> {
Idle(Option<Pipeline<N>>),
Running(oneshot::Receiver<PipelineWithResult<N>>),
}
impl<N: ProviderNodeTypes> PipelineState<N> {
const fn is_idle(&self) -> bool {
matches!(self, Self::Idle(_))
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_consensus::Header;
use alloy_eips::eip1559::ETHEREUM_BLOCK_GAS_LIMIT;
use assert_matches::assert_matches;
use futures::poll;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_network_p2p::{either::Either, test_utils::TestFullBlockClient, EthBlockClient};
use reth_primitives::{BlockBody, SealedHeader};
use reth_provider::{
test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
ExecutionOutcome,
};
use reth_prune_types::PruneModes;
use reth_stages::{test_utils::TestStages, ExecOutput, StageError};
use reth_stages_api::StageCheckpoint;
use reth_static_file::StaticFileProducer;
use reth_tasks::TokioTaskExecutor;
use std::{collections::VecDeque, future::poll_fn, ops::Range};
use tokio::sync::watch;
struct TestPipelineBuilder {
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
executor_results: Vec<ExecutionOutcome>,
max_block: Option<BlockNumber>,
}
impl TestPipelineBuilder {
const fn new() -> Self {
Self {
pipeline_exec_outputs: VecDeque::new(),
executor_results: Vec::new(),
max_block: None,
}
}
fn with_pipeline_exec_outputs(
mut self,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
) -> Self {
self.pipeline_exec_outputs = pipeline_exec_outputs;
self
}
#[allow(dead_code)]
fn with_executor_results(mut self, executor_results: Vec<ExecutionOutcome>) -> Self {
self.executor_results = executor_results;
self
}
#[allow(dead_code)]
const fn with_max_block(mut self, max_block: BlockNumber) -> Self {
self.max_block = Some(max_block);
self
}
fn build(self, chain_spec: Arc<ChainSpec>) -> Pipeline<MockNodeTypesWithDB> {
reth_tracing::init_test_tracing();
let (tip_tx, _tip_rx) = watch::channel(B256::default());
let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
.add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default()))
.with_tip_sender(tip_tx);
if let Some(max_block) = self.max_block {
pipeline = pipeline.with_max_block(max_block);
}
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec);
let static_file_producer =
StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
pipeline.build(provider_factory, static_file_producer)
}
}
struct TestSyncControllerBuilder<Client> {
max_block: Option<BlockNumber>,
client: Option<Client>,
}
impl<Client> TestSyncControllerBuilder<Client> {
const fn new() -> Self {
Self { max_block: None, client: None }
}
#[allow(dead_code)]
const fn with_max_block(mut self, max_block: BlockNumber) -> Self {
self.max_block = Some(max_block);
self
}
fn with_client(mut self, client: Client) -> Self {
self.client = Some(client);
self
}
fn build<N>(
self,
pipeline: Pipeline<N>,
chain_spec: Arc<N::ChainSpec>,
) -> EngineSyncController<N, Either<Client, TestFullBlockClient>>
where
N: ProviderNodeTypes,
Client: EthBlockClient + 'static,
{
let client = self
.client
.map(Either::Left)
.unwrap_or_else(|| Either::Right(TestFullBlockClient::default()));
EngineSyncController::new(
pipeline,
client,
Box::<TokioTaskExecutor>::default(),
self.max_block,
chain_spec,
Default::default(),
)
}
}
#[tokio::test]
async fn pipeline_started_after_setting_target() {
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(MAINNET.genesis.clone())
.paris_activated()
.build(),
);
let client = TestFullBlockClient::default();
insert_headers_into_client(&client, SealedHeader::default(), 0..10);
let pipeline = TestPipelineBuilder::new()
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(5),
done: true,
})]))
.build(chain_spec.clone());
let mut sync_controller = TestSyncControllerBuilder::new()
.with_client(client.clone())
.build(pipeline, chain_spec);
let tip = client.highest_block().expect("there should be blocks here");
sync_controller.set_pipeline_sync_target(tip.hash().into());
let sync_future = poll_fn(|cx| sync_controller.poll(cx));
let next_event = poll!(sync_future);
assert_matches!(next_event, Poll::Ready(EngineSyncEvent::PipelineStarted(Some(target))) => {
assert_eq!(target.sync_target().unwrap(), tip.hash());
});
let sync_future = poll_fn(|cx| sync_controller.poll(cx));
let next_ready = sync_future.await;
assert_matches!(next_ready, EngineSyncEvent::PipelineFinished { result, reached_max_block } => {
assert_matches!(result, Ok(control_flow) => assert_eq!(control_flow, ControlFlow::Continue { block_number: 5 }));
assert!(!reached_max_block);
});
}
fn insert_headers_into_client(
client: &TestFullBlockClient,
genesis_header: SealedHeader,
range: Range<usize>,
) {
let mut sealed_header = genesis_header;
let body = BlockBody::default();
for _ in range {
let (mut header, hash) = sealed_header.split();
header.parent_hash = hash;
header.number += 1;
header.timestamp += 1;
sealed_header = SealedHeader::seal(header);
client.insert(sealed_header.clone(), body.clone());
}
}
#[tokio::test]
async fn controller_sends_range_request() {
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(MAINNET.genesis.clone())
.paris_activated()
.build(),
);
let client = TestFullBlockClient::default();
let header = Header {
base_fee_per_gas: Some(7),
gas_limit: ETHEREUM_BLOCK_GAS_LIMIT,
..Default::default()
};
let header = SealedHeader::seal(header);
insert_headers_into_client(&client, header, 0..10);
let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
let mut sync_controller = TestSyncControllerBuilder::new()
.with_client(client.clone())
.build(pipeline, chain_spec);
let tip = client.highest_block().expect("there should be blocks here");
sync_controller.download_block_range(tip.hash(), tip.number);
assert_eq!(sync_controller.inflight_block_range_requests.len(), 1);
let first_req = sync_controller.inflight_block_range_requests.first().unwrap();
assert_eq!(first_req.start_hash(), tip.hash());
assert_eq!(first_req.count(), tip.number);
for num in 1..=10 {
let sync_future = poll_fn(|cx| sync_controller.poll(cx));
let next_ready = sync_future.await;
assert_matches!(next_ready, EngineSyncEvent::FetchedFullBlock(block) => {
assert_eq!(block.number, num);
});
}
}
}