use std::{collections::HashMap, io, path::Path};
use alloy_consensus::BlockHeader;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
use futures::Future;
use itertools::Either;
use reth_network_p2p::{
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
error::RequestError,
headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
priority::Priority,
};
use reth_network_peers::PeerId;
use reth_primitives::SealedHeader;
use reth_primitives_traits::{Block, BlockBody, FullBlock};
use thiserror::Error;
use tokio::{fs::File, io::AsyncReadExt};
use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;
use tracing::{debug, trace, warn};
use super::file_codec::BlockFileCodec;
use crate::receipt_file_client::FromReceiptReader;
pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
#[derive(Debug)]
pub struct FileClient<B: Block = reth_primitives::Block> {
headers: HashMap<BlockNumber, B::Header>,
hash_to_number: HashMap<BlockHash, BlockNumber>,
bodies: HashMap<BlockHash, B::Body>,
}
#[derive(Debug, Error)]
pub enum FileClientError {
#[error(transparent)]
Io(#[from] std::io::Error),
#[error("{0}")]
Rlp(alloy_rlp::Error, Vec<u8>),
#[error("{0}")]
Custom(&'static str),
}
impl From<&'static str> for FileClientError {
fn from(value: &'static str) -> Self {
Self::Custom(value)
}
}
impl<B: FullBlock> FileClient<B> {
pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, FileClientError> {
let file = File::open(path).await?;
Self::from_file(file).await
}
pub(crate) async fn from_file(mut file: File) -> Result<Self, FileClientError> {
let metadata = file.metadata().await?;
let file_len = metadata.len();
let mut reader = vec![];
file.read_to_end(&mut reader).await?;
Ok(Self::from_reader(&reader[..], file_len).await?.file_client)
}
pub fn tip(&self) -> Option<B256> {
self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
}
pub fn start(&self) -> Option<B256> {
self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
}
pub fn max_block(&self) -> Option<u64> {
self.headers.keys().max().copied()
}
pub fn min_block(&self) -> Option<u64> {
self.headers.keys().min().copied()
}
pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal(h.clone()))
}
pub fn has_canonical_blocks(&self) -> bool {
if self.headers.is_empty() {
return true
}
let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
nums.sort_unstable();
let mut iter = nums.into_iter();
let mut lowest = iter.next().expect("not empty");
for next in iter {
if next != lowest + 1 {
return false
}
lowest = next;
}
true
}
pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
self.bodies = bodies;
self
}
pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
self.headers = headers;
for (number, header) in &self.headers {
self.hash_to_number.insert(header.hash_slow(), *number);
}
self
}
pub fn headers_len(&self) -> usize {
self.headers.len()
}
pub fn bodies_len(&self) -> usize {
self.bodies.len()
}
pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
self.headers.values()
}
pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
let bodies = &mut self.bodies;
let numbers = &self.hash_to_number;
bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
}
pub fn total_transactions(&self) -> usize {
self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
}
}
impl<B: FullBlock> FromReader for FileClient<B> {
type Error = FileClientError;
fn from_reader<R>(
reader: R,
num_bytes: u64,
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
where
R: AsyncReadExt + Unpin,
{
let mut headers = HashMap::default();
let mut hash_to_number = HashMap::default();
let mut bodies = HashMap::default();
let mut stream =
FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
trace!(target: "downloaders::file",
target_num_bytes=num_bytes,
capacity=stream.read_buffer().capacity(),
"init decode stream"
);
let mut remaining_bytes = vec![];
let mut log_interval = 0;
let mut log_interval_start_block = 0;
async move {
while let Some(block_res) = stream.next().await {
let block = match block_res {
Ok(block) => block,
Err(FileClientError::Rlp(err, bytes)) => {
trace!(target: "downloaders::file",
%err,
bytes_len=bytes.len(),
"partial block returned from decoding chunk"
);
remaining_bytes = bytes;
break
}
Err(err) => return Err(err),
};
let block_number = block.header().number();
let block_hash = block.header().hash_slow();
headers.insert(block.header().number(), block.header().clone());
hash_to_number.insert(block_hash, block.header().number());
bodies.insert(block_hash, block.body().clone());
if log_interval == 0 {
trace!(target: "downloaders::file",
block_number,
"read first block"
);
log_interval_start_block = block_number;
} else if log_interval % 100_000 == 0 {
trace!(target: "downloaders::file",
blocks=?log_interval_start_block..=block_number,
"read blocks from file"
);
log_interval_start_block = block_number + 1;
}
log_interval += 1;
}
trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
Ok(DecodedFileChunk {
file_client: Self { headers, hash_to_number, bodies },
remaining_bytes,
highest_block: None,
})
}
}
}
impl<B: FullBlock> HeadersClient for FileClient<B> {
type Header = B::Header;
type Output = HeadersFut<B::Header>;
fn get_headers_with_priority(
&self,
request: HeadersRequest,
_priority: Priority,
) -> Self::Output {
let mut headers = Vec::new();
trace!(target: "downloaders::file", request=?request, "Getting headers");
let start_num = match request.start {
BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
Some(num) => *num,
None => {
warn!(%hash, "Could not find starting block number for requested header hash");
return Box::pin(async move { Err(RequestError::BadResponse) })
}
},
BlockHashOrNumber::Number(num) => num,
};
let range = if request.limit == 1 {
Either::Left(start_num..start_num + 1)
} else {
match request.direction {
HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
HeadersDirection::Falling => {
Either::Right((start_num - request.limit + 1..=start_num).rev())
}
}
};
trace!(target: "downloaders::file", range=?range, "Getting headers with range");
for block_number in range {
match self.headers.get(&block_number).cloned() {
Some(header) => headers.push(header),
None => {
warn!(number=%block_number, "Could not find header");
return Box::pin(async move { Err(RequestError::BadResponse) })
}
}
}
Box::pin(async move { Ok((PeerId::default(), headers).into()) })
}
}
impl<B: FullBlock> BodiesClient for FileClient<B> {
type Body = B::Body;
type Output = BodiesFut<B::Body>;
fn get_block_bodies_with_priority(
&self,
hashes: Vec<B256>,
_priority: Priority,
) -> Self::Output {
let mut bodies = Vec::new();
for hash in hashes {
match self.bodies.get(&hash).cloned() {
Some(body) => bodies.push(body),
None => return Box::pin(async move { Err(RequestError::BadResponse) }),
}
}
Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
}
}
impl<B: FullBlock> DownloadClient for FileClient<B> {
fn report_bad_message(&self, _peer_id: PeerId) {
warn!("Reported a bad message on a file client, the file may be corrupted or invalid");
}
fn num_connected_peers(&self) -> usize {
1
}
}
#[derive(Debug)]
pub struct ChunkedFileReader {
file: File,
file_byte_len: u64,
chunk: Vec<u8>,
chunk_byte_len: u64,
highest_block: Option<u64>,
}
impl ChunkedFileReader {
pub const fn file_len(&self) -> u64 {
self.file_byte_len
}
pub async fn new<P: AsRef<Path>>(
path: P,
chunk_byte_len: Option<u64>,
) -> Result<Self, FileClientError> {
let file = File::open(path).await?;
let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
Self::from_file(file, chunk_byte_len).await
}
pub async fn from_file(file: File, chunk_byte_len: u64) -> Result<Self, FileClientError> {
let metadata = file.metadata().await?;
let file_byte_len = metadata.len();
Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len, highest_block: None })
}
fn chunk_len(&self) -> u64 {
let Self { chunk_byte_len, file_byte_len, .. } = *self;
let file_byte_len = file_byte_len + self.chunk.len() as u64;
if chunk_byte_len > file_byte_len {
file_byte_len
} else {
chunk_byte_len
}
}
async fn read_next_chunk(&mut self) -> Result<Option<u64>, io::Error> {
if self.file_byte_len == 0 && self.chunk.is_empty() {
return Ok(None)
}
let chunk_target_len = self.chunk_len();
let old_bytes_len = self.chunk.len() as u64;
let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
let prev_read_bytes_len = self.chunk.len();
self.chunk.extend(std::iter::repeat(0).take(new_read_bytes_target_len as usize));
let reader = &mut self.chunk[prev_read_bytes_len..];
let new_read_bytes_len = self.file.read_exact(reader).await? as u64;
let next_chunk_byte_len = self.chunk.len();
self.file_byte_len -= new_read_bytes_len;
debug!(target: "downloaders::file",
max_chunk_byte_len=self.chunk_byte_len,
prev_read_bytes_len,
new_read_bytes_target_len,
new_read_bytes_len,
next_chunk_byte_len,
remaining_file_byte_len=self.file_byte_len,
"new bytes were read from file"
);
Ok(Some(next_chunk_byte_len as u64))
}
pub async fn next_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
where
T: FromReader,
{
let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
let DecodedFileChunk { file_client, remaining_bytes, .. } =
T::from_reader(&self.chunk[..], next_chunk_byte_len).await?;
self.chunk = remaining_bytes;
Ok(Some(file_client))
}
pub async fn next_receipts_chunk<T, D>(&mut self) -> Result<Option<T>, T::Error>
where
T: FromReceiptReader<D>,
{
let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
.await?;
self.chunk = remaining_bytes;
self.highest_block = highest_block;
Ok(Some(file_client))
}
}
pub trait FromReader {
type Error: From<io::Error>;
fn from_reader<B>(
reader: B,
num_bytes: u64,
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
where
Self: Sized,
B: AsyncReadExt + Unpin;
}
#[derive(Debug)]
pub struct DecodedFileChunk<T> {
pub file_client: T,
pub remaining_bytes: Vec<u8>,
pub highest_block: Option<u64>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
bodies::{
bodies::BodiesDownloaderBuilder,
test_utils::{insert_headers, zip_blocks},
},
headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
test_utils::{generate_bodies, generate_bodies_file},
};
use assert_matches::assert_matches;
use futures_util::stream::StreamExt;
use rand::Rng;
use reth_consensus::test_utils::TestConsensus;
use reth_network_p2p::{
bodies::downloader::BodyDownloader,
headers::downloader::{HeaderDownloader, SyncTarget},
};
use reth_provider::test_utils::create_test_provider_factory;
use std::sync::Arc;
#[tokio::test]
async fn streams_bodies_from_buffer() {
let factory = create_test_provider_factory();
let (headers, mut bodies) = generate_bodies(0..=19);
insert_headers(factory.db_ref().db(), &headers);
let file = tempfile::tempfile().unwrap();
let client: Arc<FileClient> =
Arc::new(FileClient::from_file(file.into()).await.unwrap().with_bodies(bodies.clone()));
let mut downloader = BodiesDownloaderBuilder::default().build(
client.clone(),
Arc::new(TestConsensus::default()),
factory,
);
downloader.set_download_range(0..=19).expect("failed to set download range");
assert_matches!(
downloader.next().await,
Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
);
}
#[tokio::test]
async fn download_headers_at_fork_head() {
reth_tracing::init_test_tracing();
let p3 = SealedHeader::default();
let p2 = child_header(&p3);
let p1 = child_header(&p2);
let p0 = child_header(&p1);
let file = tempfile::tempfile().unwrap();
let client: Arc<FileClient> = Arc::new(
FileClient::from_file(file.into()).await.unwrap().with_headers(HashMap::from([
(0u64, p0.clone().unseal()),
(1, p1.clone().unseal()),
(2, p2.clone().unseal()),
(3, p3.clone().unseal()),
])),
);
let mut downloader = ReverseHeadersDownloaderBuilder::default()
.stream_batch_size(3)
.request_limit(3)
.build(Arc::clone(&client), Arc::new(TestConsensus::default()));
downloader.update_local_head(p3.clone());
downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p0, p1, p2]));
assert!(downloader.next().await.is_none());
assert!(downloader.next().await.is_none());
}
#[tokio::test]
async fn test_download_headers_from_file() {
reth_tracing::init_test_tracing();
let (file, headers, _) = generate_bodies_file(0..=19).await;
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
.build(Arc::clone(&client), Arc::new(TestConsensus::default()));
header_downloader.update_local_head(headers.first().unwrap().clone());
header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
downloaded_headers.reverse();
assert_eq!(downloaded_headers, headers[1..]);
}
#[tokio::test]
async fn test_download_bodies_from_file() {
let factory = create_test_provider_factory();
let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
insert_headers(factory.db_ref().db(), &headers);
let mut downloader = BodiesDownloaderBuilder::default().build(
client.clone(),
Arc::new(TestConsensus::default()),
factory,
);
downloader.set_download_range(0..=19).expect("failed to set download range");
assert_matches!(
downloader.next().await,
Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
);
}
#[tokio::test]
async fn test_chunk_download_headers_from_file() {
reth_tracing::init_test_tracing();
let (file, headers, _) = generate_bodies_file(0..=14).await;
let chunk_byte_len = rand::thread_rng().gen_range(2000..=10_000);
trace!(target: "downloaders::file::test", chunk_byte_len);
let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len as u64).await.unwrap();
let mut downloaded_headers: Vec<SealedHeader> = vec![];
let mut local_header = headers.first().unwrap().clone();
while let Some(client) = reader.next_chunk::<FileClient>().await.unwrap() {
let sync_target = client.tip_header().unwrap();
let sync_target_hash = sync_target.hash();
let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
.build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
header_downloader.update_local_head(local_header.clone());
header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
local_header = sync_target;
downloaded_headers_chunk.reverse();
downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
}
assert_eq!(headers[1..], downloaded_headers);
}
}