use alloy_primitives::TxHash;
use futures_util::{stream::Fuse, StreamExt};
use reth_transaction_pool::{TransactionPool, ValidPoolTransaction};
use std::{
fmt,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use tokio::{sync::mpsc::Receiver, time::Interval};
use tokio_stream::{wrappers::ReceiverStream, Stream};
#[derive(Debug)]
pub enum MiningMode {
None,
Auto(ReadyTransactionMiner),
FixedBlockTime(FixedBlockTimeMiner),
}
impl MiningMode {
pub fn instant(max_transactions: usize, listener: Receiver<TxHash>) -> Self {
Self::Auto(ReadyTransactionMiner {
max_transactions,
has_pending_txs: None,
rx: ReceiverStream::new(listener).fuse(),
})
}
pub fn interval(duration: Duration) -> Self {
Self::FixedBlockTime(FixedBlockTimeMiner::new(duration))
}
pub(crate) fn poll<Pool>(
&mut self,
pool: &Pool,
cx: &mut Context<'_>,
) -> Poll<Vec<Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>>
where
Pool: TransactionPool,
{
match self {
Self::None => Poll::Pending,
Self::Auto(miner) => miner.poll(pool, cx),
Self::FixedBlockTime(miner) => miner.poll(pool, cx),
}
}
}
impl fmt::Display for MiningMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let kind = match self {
Self::None => "None",
Self::Auto(_) => "Auto",
Self::FixedBlockTime(_) => "FixedBlockTime",
};
write!(f, "{kind}")
}
}
#[derive(Debug)]
pub struct FixedBlockTimeMiner {
interval: Interval,
}
impl FixedBlockTimeMiner {
pub(crate) fn new(duration: Duration) -> Self {
let start = tokio::time::Instant::now() + duration;
Self { interval: tokio::time::interval_at(start, duration) }
}
fn poll<Pool>(
&mut self,
pool: &Pool,
cx: &mut Context<'_>,
) -> Poll<Vec<Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>>
where
Pool: TransactionPool,
{
if self.interval.poll_tick(cx).is_ready() {
return Poll::Ready(pool.best_transactions().collect())
}
Poll::Pending
}
}
impl Default for FixedBlockTimeMiner {
fn default() -> Self {
Self::new(Duration::from_secs(6))
}
}
pub struct ReadyTransactionMiner {
max_transactions: usize,
has_pending_txs: Option<bool>,
rx: Fuse<ReceiverStream<TxHash>>,
}
impl ReadyTransactionMiner {
fn poll<Pool>(
&mut self,
pool: &Pool,
cx: &mut Context<'_>,
) -> Poll<Vec<Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>>
where
Pool: TransactionPool,
{
while let Poll::Ready(Some(_hash)) = Pin::new(&mut self.rx).poll_next(cx) {
self.has_pending_txs = Some(true);
}
if self.has_pending_txs == Some(false) {
return Poll::Pending
}
let transactions = pool.best_transactions().take(self.max_transactions).collect::<Vec<_>>();
self.has_pending_txs = Some(transactions.len() >= self.max_transactions);
if transactions.is_empty() {
return Poll::Pending
}
Poll::Ready(transactions)
}
}
impl fmt::Debug for ReadyTransactionMiner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReadyTransactionMiner")
.field("max_transactions", &self.max_transactions)
.finish_non_exhaustive()
}
}