#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use crate::metrics::PayloadBuilderMetrics;
use alloy_consensus::constants::EMPTY_WITHDRAWALS;
use alloy_eips::{eip4895::Withdrawals, merge::SLOT_DURATION};
use alloy_primitives::{B256, U256};
use futures_core::ready;
use futures_util::FutureExt;
use reth_chainspec::EthereumHardforks;
use reth_evm::state_change::post_block_withdrawals_balance_increments;
use reth_payload_builder::{KeepPayloadJobAlive, PayloadId, PayloadJob, PayloadJobGenerator};
use reth_payload_builder_primitives::PayloadBuilderError;
use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKind};
use reth_primitives::{proofs, NodePrimitives, SealedHeader};
use reth_provider::{BlockReaderIdExt, CanonStateNotification, StateProviderFactory};
use reth_revm::cached::CachedReads;
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use revm::{Database, State};
use std::{
fmt,
future::Future,
ops::Deref,
pin::Pin,
sync::{atomic::AtomicBool, Arc},
task::{Context, Poll},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::{
sync::{oneshot, Semaphore},
time::{Interval, Sleep},
};
use tracing::{debug, trace, warn};
mod metrics;
mod stack;
pub use stack::PayloadBuilderStack;
#[derive(Debug)]
pub struct BasicPayloadJobGenerator<Client, Pool, Tasks, Builder> {
client: Client,
pool: Pool,
executor: Tasks,
config: BasicPayloadJobGeneratorConfig,
payload_task_guard: PayloadTaskGuard,
builder: Builder,
pre_cached: Option<PrecachedState>,
}
impl<Client, Pool, Tasks, Builder> BasicPayloadJobGenerator<Client, Pool, Tasks, Builder> {
pub fn with_builder(
client: Client,
pool: Pool,
executor: Tasks,
config: BasicPayloadJobGeneratorConfig,
builder: Builder,
) -> Self {
Self {
client,
pool,
executor,
payload_task_guard: PayloadTaskGuard::new(config.max_payload_tasks),
config,
builder,
pre_cached: None,
}
}
#[inline]
fn max_job_duration(&self, unix_timestamp: u64) -> Duration {
let duration_until_timestamp = duration_until(unix_timestamp);
let duration_until_timestamp = duration_until_timestamp.min(self.config.deadline * 3);
self.config.deadline + duration_until_timestamp
}
#[inline]
fn job_deadline(&self, unix_timestamp: u64) -> tokio::time::Instant {
tokio::time::Instant::now() + self.max_job_duration(unix_timestamp)
}
pub const fn tasks(&self) -> &Tasks {
&self.executor
}
fn maybe_pre_cached(&self, parent: B256) -> Option<CachedReads> {
self.pre_cached.as_ref().filter(|pc| pc.block == parent).map(|pc| pc.cached.clone())
}
}
impl<Client, Pool, Tasks, Builder> PayloadJobGenerator
for BasicPayloadJobGenerator<Client, Pool, Tasks, Builder>
where
Client: StateProviderFactory
+ BlockReaderIdExt<Header = alloy_consensus::Header>
+ Clone
+ Unpin
+ 'static,
Pool: TransactionPool + Unpin + 'static,
Tasks: TaskSpawner + Clone + Unpin + 'static,
Builder: PayloadBuilder<Pool, Client> + Unpin + 'static,
<Builder as PayloadBuilder<Pool, Client>>::Attributes: Unpin + Clone,
<Builder as PayloadBuilder<Pool, Client>>::BuiltPayload: Unpin + Clone,
{
type Job = BasicPayloadJob<Client, Pool, Tasks, Builder>;
fn new_payload_job(
&self,
attributes: <Self::Job as PayloadJob>::PayloadAttributes,
) -> Result<Self::Job, PayloadBuilderError> {
let parent_header = if attributes.parent().is_zero() {
self.client
.latest_header()
.map_err(PayloadBuilderError::from)?
.ok_or_else(|| PayloadBuilderError::MissingParentHeader(B256::ZERO))?
} else {
self.client
.sealed_header_by_hash(attributes.parent())
.map_err(PayloadBuilderError::from)?
.ok_or_else(|| PayloadBuilderError::MissingParentHeader(attributes.parent()))?
};
let config = PayloadConfig::new(Arc::new(parent_header.clone()), attributes);
let until = self.job_deadline(config.attributes.timestamp());
let deadline = Box::pin(tokio::time::sleep_until(until));
let cached_reads = self.maybe_pre_cached(parent_header.hash());
let mut job = BasicPayloadJob {
config,
client: self.client.clone(),
pool: self.pool.clone(),
executor: self.executor.clone(),
deadline,
interval: tokio::time::interval(self.config.interval),
best_payload: PayloadState::Missing,
pending_block: None,
cached_reads,
payload_task_guard: self.payload_task_guard.clone(),
metrics: Default::default(),
builder: self.builder.clone(),
};
job.spawn_build_job();
Ok(job)
}
fn on_new_state<N: NodePrimitives>(&mut self, new_state: CanonStateNotification<N>) {
let mut cached = CachedReads::default();
let committed = new_state.committed();
let new_execution_outcome = committed.execution_outcome();
for (addr, acc) in new_execution_outcome.bundle_accounts_iter() {
if let Some(info) = acc.info.clone() {
let storage =
acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect();
cached.insert_account(addr, info, storage);
}
}
self.pre_cached = Some(PrecachedState { block: committed.tip().hash(), cached });
}
}
#[derive(Debug, Clone)]
pub struct PrecachedState {
pub block: B256,
pub cached: CachedReads,
}
#[derive(Debug, Clone)]
pub struct PayloadTaskGuard(Arc<Semaphore>);
impl Deref for PayloadTaskGuard {
type Target = Semaphore;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl PayloadTaskGuard {
pub fn new(max_payload_tasks: usize) -> Self {
Self(Arc::new(Semaphore::new(max_payload_tasks)))
}
}
#[derive(Debug, Clone)]
pub struct BasicPayloadJobGeneratorConfig {
interval: Duration,
deadline: Duration,
max_payload_tasks: usize,
}
impl BasicPayloadJobGeneratorConfig {
pub const fn interval(mut self, interval: Duration) -> Self {
self.interval = interval;
self
}
pub const fn deadline(mut self, deadline: Duration) -> Self {
self.deadline = deadline;
self
}
pub fn max_payload_tasks(mut self, max_payload_tasks: usize) -> Self {
assert!(max_payload_tasks > 0, "max_payload_tasks must be greater than 0");
self.max_payload_tasks = max_payload_tasks;
self
}
}
impl Default for BasicPayloadJobGeneratorConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(1),
deadline: SLOT_DURATION,
max_payload_tasks: 3,
}
}
}
#[derive(Debug)]
pub struct BasicPayloadJob<Client, Pool, Tasks, Builder>
where
Builder: PayloadBuilder<Pool, Client>,
{
config: PayloadConfig<Builder::Attributes>,
client: Client,
pool: Pool,
executor: Tasks,
deadline: Pin<Box<Sleep>>,
interval: Interval,
best_payload: PayloadState<Builder::BuiltPayload>,
pending_block: Option<PendingPayload<Builder::BuiltPayload>>,
payload_task_guard: PayloadTaskGuard,
cached_reads: Option<CachedReads>,
metrics: PayloadBuilderMetrics,
builder: Builder,
}
impl<Client, Pool, Tasks, Builder> BasicPayloadJob<Client, Pool, Tasks, Builder>
where
Client: StateProviderFactory + Clone + Unpin + 'static,
Pool: TransactionPool + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
Builder: PayloadBuilder<Pool, Client> + Unpin + 'static,
<Builder as PayloadBuilder<Pool, Client>>::Attributes: Unpin + Clone,
<Builder as PayloadBuilder<Pool, Client>>::BuiltPayload: Unpin + Clone,
{
fn spawn_build_job(&mut self) {
trace!(target: "payload_builder", id = %self.config.payload_id(), "spawn new payload build task");
let (tx, rx) = oneshot::channel();
let client = self.client.clone();
let pool = self.pool.clone();
let cancel = Cancelled::default();
let _cancel = cancel.clone();
let guard = self.payload_task_guard.clone();
let payload_config = self.config.clone();
let best_payload = self.best_payload.payload().cloned();
self.metrics.inc_initiated_payload_builds();
let cached_reads = self.cached_reads.take().unwrap_or_default();
let builder = self.builder.clone();
self.executor.spawn_blocking(Box::pin(async move {
let _permit = guard.acquire().await;
let args = BuildArguments {
client,
pool,
cached_reads,
config: payload_config,
cancel,
best_payload,
};
let result = builder.try_build(args);
let _ = tx.send(result);
}));
self.pending_block = Some(PendingPayload { _cancel, payload: rx });
}
}
impl<Client, Pool, Tasks, Builder> Future for BasicPayloadJob<Client, Pool, Tasks, Builder>
where
Client: StateProviderFactory + Clone + Unpin + 'static,
Pool: TransactionPool + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
Builder: PayloadBuilder<Pool, Client> + Unpin + 'static,
<Builder as PayloadBuilder<Pool, Client>>::Attributes: Unpin + Clone,
<Builder as PayloadBuilder<Pool, Client>>::BuiltPayload: Unpin + Clone,
{
type Output = Result<(), PayloadBuilderError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.deadline.as_mut().poll(cx).is_ready() {
trace!(target: "payload_builder", "payload building deadline reached");
return Poll::Ready(Ok(()))
}
while this.interval.poll_tick(cx).is_ready() {
if this.pending_block.is_none() && !this.best_payload.is_frozen() {
this.spawn_build_job();
}
}
if let Some(mut fut) = this.pending_block.take() {
match fut.poll_unpin(cx) {
Poll::Ready(Ok(outcome)) => match outcome {
BuildOutcome::Better { payload, cached_reads } => {
this.cached_reads = Some(cached_reads);
debug!(target: "payload_builder", value = %payload.fees(), "built better payload");
this.best_payload = PayloadState::Best(payload);
}
BuildOutcome::Freeze(payload) => {
debug!(target: "payload_builder", "payload frozen, no further building will occur");
this.best_payload = PayloadState::Frozen(payload);
}
BuildOutcome::Aborted { fees, cached_reads } => {
this.cached_reads = Some(cached_reads);
trace!(target: "payload_builder", worse_fees = %fees, "skipped payload build of worse block");
}
BuildOutcome::Cancelled => {
unreachable!("the cancel signal never fired")
}
},
Poll::Ready(Err(error)) => {
debug!(target: "payload_builder", %error, "payload build attempt failed");
this.metrics.inc_failed_payload_builds();
}
Poll::Pending => {
this.pending_block = Some(fut);
}
}
}
Poll::Pending
}
}
impl<Client, Pool, Tasks, Builder> PayloadJob for BasicPayloadJob<Client, Pool, Tasks, Builder>
where
Client: StateProviderFactory + Clone + Unpin + 'static,
Pool: TransactionPool + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
Builder: PayloadBuilder<Pool, Client> + Unpin + 'static,
<Builder as PayloadBuilder<Pool, Client>>::Attributes: Unpin + Clone,
<Builder as PayloadBuilder<Pool, Client>>::BuiltPayload: Unpin + Clone,
{
type PayloadAttributes = Builder::Attributes;
type ResolvePayloadFuture = ResolveBestPayload<Self::BuiltPayload>;
type BuiltPayload = Builder::BuiltPayload;
fn best_payload(&self) -> Result<Self::BuiltPayload, PayloadBuilderError> {
if let Some(payload) = self.best_payload.payload() {
Ok(payload.clone())
} else {
self.metrics.inc_requested_empty_payload();
self.builder.build_empty_payload(&self.client, self.config.clone())
}
}
fn payload_attributes(&self) -> Result<Self::PayloadAttributes, PayloadBuilderError> {
Ok(self.config.attributes.clone())
}
fn resolve_kind(
&mut self,
kind: PayloadKind,
) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
let best_payload = self.best_payload.payload().cloned();
if best_payload.is_none() && self.pending_block.is_none() {
self.spawn_build_job();
}
let maybe_better = self.pending_block.take();
let mut empty_payload = None;
if best_payload.is_none() {
debug!(target: "payload_builder", id=%self.config.payload_id(), "no best payload yet to resolve, building empty payload");
let args = BuildArguments {
client: self.client.clone(),
pool: self.pool.clone(),
cached_reads: self.cached_reads.take().unwrap_or_default(),
config: self.config.clone(),
cancel: Cancelled::default(),
best_payload: None,
};
match self.builder.on_missing_payload(args) {
MissingPayloadBehaviour::AwaitInProgress => {
debug!(target: "payload_builder", id=%self.config.payload_id(), "awaiting in progress payload build job");
}
MissingPayloadBehaviour::RaceEmptyPayload => {
debug!(target: "payload_builder", id=%self.config.payload_id(), "racing empty payload");
self.metrics.inc_requested_empty_payload();
let (tx, rx) = oneshot::channel();
let client = self.client.clone();
let config = self.config.clone();
let builder = self.builder.clone();
self.executor.spawn_blocking(Box::pin(async move {
let res = builder.build_empty_payload(&client, config);
let _ = tx.send(res);
}));
empty_payload = Some(rx);
}
MissingPayloadBehaviour::RacePayload(job) => {
debug!(target: "payload_builder", id=%self.config.payload_id(), "racing fallback payload");
let (tx, rx) = oneshot::channel();
self.executor.spawn_blocking(Box::pin(async move {
let _ = tx.send(job());
}));
empty_payload = Some(rx);
}
};
}
let fut = ResolveBestPayload {
best_payload,
maybe_better,
empty_payload: empty_payload.filter(|_| kind != PayloadKind::WaitForPending),
};
(fut, KeepPayloadJobAlive::No)
}
}
#[derive(Debug, Clone)]
pub enum PayloadState<P> {
Missing,
Best(P),
Frozen(P),
}
impl<P> PayloadState<P> {
pub const fn is_frozen(&self) -> bool {
matches!(self, Self::Frozen(_))
}
pub const fn payload(&self) -> Option<&P> {
match self {
Self::Missing => None,
Self::Best(p) | Self::Frozen(p) => Some(p),
}
}
}
#[derive(Debug)]
pub struct ResolveBestPayload<Payload> {
pub best_payload: Option<Payload>,
pub maybe_better: Option<PendingPayload<Payload>>,
pub empty_payload: Option<oneshot::Receiver<Result<Payload, PayloadBuilderError>>>,
}
impl<Payload> ResolveBestPayload<Payload> {
const fn is_empty(&self) -> bool {
self.best_payload.is_none() && self.maybe_better.is_none() && self.empty_payload.is_none()
}
}
impl<Payload> Future for ResolveBestPayload<Payload>
where
Payload: Unpin,
{
type Output = Result<Payload, PayloadBuilderError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if let Some(fut) = Pin::new(&mut this.maybe_better).as_pin_mut() {
if let Poll::Ready(res) = fut.poll(cx) {
this.maybe_better = None;
if let Ok(Some(payload)) = res.map(|out| out.into_payload())
.inspect_err(|err| warn!(target: "payload_builder", %err, "failed to resolve pending payload"))
{
debug!(target: "payload_builder", "resolving better payload");
return Poll::Ready(Ok(payload))
}
}
}
if let Some(best) = this.best_payload.take() {
debug!(target: "payload_builder", "resolving best payload");
return Poll::Ready(Ok(best))
}
if let Some(fut) = Pin::new(&mut this.empty_payload).as_pin_mut() {
if let Poll::Ready(res) = fut.poll(cx) {
this.empty_payload = None;
return match res {
Ok(res) => {
if let Err(err) = &res {
warn!(target: "payload_builder", %err, "failed to resolve empty payload");
} else {
debug!(target: "payload_builder", "resolving empty payload");
}
Poll::Ready(res)
}
Err(err) => Poll::Ready(Err(err.into())),
}
}
}
if this.is_empty() {
return Poll::Ready(Err(PayloadBuilderError::MissingPayload))
}
Poll::Pending
}
}
#[derive(Debug)]
pub struct PendingPayload<P> {
_cancel: Cancelled,
payload: oneshot::Receiver<Result<BuildOutcome<P>, PayloadBuilderError>>,
}
impl<P> PendingPayload<P> {
pub const fn new(
cancel: Cancelled,
payload: oneshot::Receiver<Result<BuildOutcome<P>, PayloadBuilderError>>,
) -> Self {
Self { _cancel: cancel, payload }
}
}
impl<P> Future for PendingPayload<P> {
type Output = Result<BuildOutcome<P>, PayloadBuilderError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = ready!(self.payload.poll_unpin(cx));
Poll::Ready(res.map_err(Into::into).and_then(|res| res))
}
}
#[derive(Default, Clone, Debug)]
pub struct Cancelled(Arc<AtomicBool>);
impl Cancelled {
pub fn is_cancelled(&self) -> bool {
self.0.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl Drop for Cancelled {
fn drop(&mut self) {
self.0.store(true, std::sync::atomic::Ordering::Relaxed);
}
}
#[derive(Clone, Debug)]
pub struct PayloadConfig<Attributes> {
pub parent_header: Arc<SealedHeader>,
pub attributes: Attributes,
}
impl<Attributes> PayloadConfig<Attributes>
where
Attributes: PayloadBuilderAttributes,
{
pub const fn new(parent_header: Arc<SealedHeader>, attributes: Attributes) -> Self {
Self { parent_header, attributes }
}
pub fn payload_id(&self) -> PayloadId {
self.attributes.payload_id()
}
}
#[derive(Debug)]
pub enum BuildOutcome<Payload> {
Better {
payload: Payload,
cached_reads: CachedReads,
},
Aborted {
fees: U256,
cached_reads: CachedReads,
},
Cancelled,
Freeze(Payload),
}
impl<Payload> BuildOutcome<Payload> {
pub fn into_payload(self) -> Option<Payload> {
match self {
Self::Better { payload, .. } | Self::Freeze(payload) => Some(payload),
_ => None,
}
}
pub const fn is_better(&self) -> bool {
matches!(self, Self::Better { .. })
}
pub const fn is_aborted(&self) -> bool {
matches!(self, Self::Aborted { .. })
}
pub const fn is_cancelled(&self) -> bool {
matches!(self, Self::Cancelled)
}
pub(crate) fn map_payload<F, P>(self, f: F) -> BuildOutcome<P>
where
F: FnOnce(Payload) -> P,
{
match self {
Self::Better { payload, cached_reads } => {
BuildOutcome::Better { payload: f(payload), cached_reads }
}
Self::Aborted { fees, cached_reads } => BuildOutcome::Aborted { fees, cached_reads },
Self::Cancelled => BuildOutcome::Cancelled,
Self::Freeze(payload) => BuildOutcome::Freeze(f(payload)),
}
}
}
#[derive(Debug)]
pub enum BuildOutcomeKind<Payload> {
Better {
payload: Payload,
},
Aborted {
fees: U256,
},
Cancelled,
Freeze(Payload),
}
impl<Payload> BuildOutcomeKind<Payload> {
pub fn with_cached_reads(self, cached_reads: CachedReads) -> BuildOutcome<Payload> {
match self {
Self::Better { payload } => BuildOutcome::Better { payload, cached_reads },
Self::Aborted { fees } => BuildOutcome::Aborted { fees, cached_reads },
Self::Cancelled => BuildOutcome::Cancelled,
Self::Freeze(payload) => BuildOutcome::Freeze(payload),
}
}
}
#[derive(Debug)]
pub struct BuildArguments<Pool, Client, Attributes, Payload> {
pub client: Client,
pub pool: Pool,
pub cached_reads: CachedReads,
pub config: PayloadConfig<Attributes>,
pub cancel: Cancelled,
pub best_payload: Option<Payload>,
}
impl<Pool, Client, Attributes, Payload> BuildArguments<Pool, Client, Attributes, Payload> {
pub const fn new(
client: Client,
pool: Pool,
cached_reads: CachedReads,
config: PayloadConfig<Attributes>,
cancel: Cancelled,
best_payload: Option<Payload>,
) -> Self {
Self { client, pool, cached_reads, config, cancel, best_payload }
}
pub fn with_pool<P>(self, pool: P) -> BuildArguments<P, Client, Attributes, Payload> {
BuildArguments {
client: self.client,
pool,
cached_reads: self.cached_reads,
config: self.config,
cancel: self.cancel,
best_payload: self.best_payload,
}
}
pub fn map_pool<F, P>(self, f: F) -> BuildArguments<P, Client, Attributes, Payload>
where
F: FnOnce(Pool) -> P,
{
BuildArguments {
client: self.client,
pool: f(self.pool),
cached_reads: self.cached_reads,
config: self.config,
cancel: self.cancel,
best_payload: self.best_payload,
}
}
}
pub trait PayloadBuilder<Pool, Client>: Send + Sync + Clone {
type Attributes: PayloadBuilderAttributes;
type BuiltPayload: BuiltPayload;
fn try_build(
&self,
args: BuildArguments<Pool, Client, Self::Attributes, Self::BuiltPayload>,
) -> Result<BuildOutcome<Self::BuiltPayload>, PayloadBuilderError>;
fn on_missing_payload(
&self,
_args: BuildArguments<Pool, Client, Self::Attributes, Self::BuiltPayload>,
) -> MissingPayloadBehaviour<Self::BuiltPayload> {
MissingPayloadBehaviour::RaceEmptyPayload
}
fn build_empty_payload(
&self,
client: &Client,
config: PayloadConfig<Self::Attributes>,
) -> Result<Self::BuiltPayload, PayloadBuilderError>;
}
pub enum MissingPayloadBehaviour<Payload> {
AwaitInProgress,
RaceEmptyPayload,
RacePayload(Box<dyn FnOnce() -> Result<Payload, PayloadBuilderError> + Send>),
}
impl<Payload> fmt::Debug for MissingPayloadBehaviour<Payload> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::AwaitInProgress => write!(f, "AwaitInProgress"),
Self::RaceEmptyPayload => {
write!(f, "RaceEmptyPayload")
}
Self::RacePayload(_) => write!(f, "RacePayload"),
}
}
}
impl<Payload> Default for MissingPayloadBehaviour<Payload> {
fn default() -> Self {
Self::RaceEmptyPayload
}
}
pub fn commit_withdrawals<DB, ChainSpec>(
db: &mut State<DB>,
chain_spec: &ChainSpec,
timestamp: u64,
withdrawals: &Withdrawals,
) -> Result<Option<B256>, DB::Error>
where
DB: Database,
ChainSpec: EthereumHardforks,
{
if !chain_spec.is_shanghai_active_at_timestamp(timestamp) {
return Ok(None)
}
if withdrawals.is_empty() {
return Ok(Some(EMPTY_WITHDRAWALS))
}
let balance_increments =
post_block_withdrawals_balance_increments(chain_spec, timestamp, withdrawals);
db.increment_balances(balance_increments)?;
Ok(Some(proofs::calculate_withdrawals_root(withdrawals)))
}
#[inline(always)]
pub fn is_better_payload<T: BuiltPayload>(best_payload: Option<&T>, new_fees: U256) -> bool {
if let Some(best_payload) = best_payload {
new_fees > best_payload.fees()
} else {
true
}
}
fn duration_until(unix_timestamp_secs: u64) -> Duration {
let unix_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default();
let timestamp = Duration::from_secs(unix_timestamp_secs);
timestamp.saturating_sub(unix_now)
}