Skip to main content

reth_node_ethereum/
engine_ssz_proxy.rs

1//! HTTP SSZ transport proxy for the authenticated Engine API server.
2//!
3//! Implements the [EIP-8178] SSZ Engine API routes under `/engine/v2`.
4//!
5//! [EIP-8178]: https://eips.ethereum.org/EIPS/eip-8178
6
7use alloy_consensus::{Transaction, TxEnvelope};
8use alloy_eips::{
9    eip2718::Decodable2718,
10    eip7685::{Requests, RequestsOrHash},
11};
12use alloy_primitives::{Bytes, B128, B256};
13use alloy_rpc_types_engine::{
14    CancunPayloadFields, ExecutionData, ExecutionPayload, ExecutionPayloadSidecar,
15    ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3, ExecutionPayloadV4,
16    ForkchoiceState, PayloadAttributes, PraguePayloadFields,
17};
18use http_body_util::BodyExt;
19use jsonrpsee::server::{HttpBody, HttpRequest, HttpResponse};
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::EngineApiValidator;
22use reth_ethereum_engine_primitives::EthEngineTypes;
23use reth_provider::{BalProvider, BlockReader, HeaderProvider, StateProviderFactory};
24use reth_rpc::EngineApi;
25use reth_transaction_pool::TransactionPool;
26use ssz::Decode;
27use std::{
28    future::Future,
29    pin::Pin,
30    sync::Arc,
31    task::{Context, Poll},
32};
33use tokio::sync::RwLock;
34use tower::{BoxError, Layer, Service};
35
36const OCTET_STREAM: &str = "application/octet-stream";
37const APPLICATION_JSON: &str = "application/json";
38const TEXT_PLAIN: &str = "text/plain";
39const CONTENT_TYPE: &str = "content-type";
40
41const STATUS_OK: u16 = 200;
42const STATUS_BAD_REQUEST: u16 = 400;
43const STATUS_NOT_FOUND: u16 = 404;
44const STATUS_METHOD_NOT_ALLOWED: u16 = 405;
45const STATUS_INTERNAL_SERVER_ERROR: u16 = 500;
46const STATUS_SERVICE_UNAVAILABLE: u16 = 503;
47
48const MAX_BLOB_LIMIT: usize = 128;
49const MAX_PAYLOAD_BYTES: u64 = 64 * 1024 * 1024;
50
51type EthEngineApi<Provider, Pool, Validator, ChainSpec> =
52    EngineApi<Provider, EthEngineTypes, Pool, Validator, ChainSpec>;
53type SharedEthEngineApi<Provider, Pool, Validator, ChainSpec> =
54    Arc<RwLock<Option<EthEngineApi<Provider, Pool, Validator, ChainSpec>>>>;
55
56/// Shared handle used by [`EngineSszProxyLayer`].
57pub struct EngineSszProxyHandle<ChainSpec, Provider = (), Pool = (), Validator = ()> {
58    engine_api: SharedEthEngineApi<Provider, Pool, Validator, ChainSpec>,
59}
60
61impl<C, Provider, Pool, Validator> Clone for EngineSszProxyHandle<C, Provider, Pool, Validator> {
62    fn clone(&self) -> Self {
63        Self { engine_api: self.engine_api.clone() }
64    }
65}
66
67impl<C, Provider, Pool, Validator> std::fmt::Debug
68    for EngineSszProxyHandle<C, Provider, Pool, Validator>
69{
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("EngineSszProxyHandle").finish_non_exhaustive()
72    }
73}
74
75impl<ChainSpec, Provider, Pool, Validator>
76    EngineSszProxyHandle<ChainSpec, Provider, Pool, Validator>
77{
78    fn new() -> Self {
79        Self { engine_api: Default::default() }
80    }
81
82    fn with_engine_api(engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>) -> Self {
83        Self { engine_api: Arc::new(RwLock::new(Some(engine_api))) }
84    }
85
86    /// Sets the Engine API implementation used by the proxy.
87    pub async fn set_engine_api(
88        &self,
89        engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>,
90    ) {
91        *self.engine_api.write().await = Some(engine_api);
92    }
93
94    /// Sets the Engine API implementation during synchronous launch wiring.
95    pub fn set_engine_api_sync(
96        &self,
97        engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>,
98    ) {
99        *self
100            .engine_api
101            .try_write()
102            .expect("engine api handle should not be locked during launch") = Some(engine_api);
103    }
104}
105
106impl<ChainSpec, Provider, Pool, Validator>
107    EngineSszProxyHandle<ChainSpec, Provider, Pool, Validator>
108{
109    /// Returns the Engine API implementation used by the proxy.
110    pub async fn engine_api(&self) -> Option<EthEngineApi<Provider, Pool, Validator, ChainSpec>> {
111        self.engine_api.read().await.clone()
112    }
113}
114
115/// A tower layer that intercepts SSZ Engine API routes under `/engine/v2`.
116#[derive(Clone, Debug)]
117pub struct EngineSszProxyLayer<ChainSpec, Provider = (), Pool = (), Validator = ()> {
118    handle: EngineSszProxyHandle<ChainSpec, Provider, Pool, Validator>,
119}
120
121impl<ChainSpec, Provider, Pool, Validator>
122    EngineSszProxyLayer<ChainSpec, Provider, Pool, Validator>
123{
124    /// Creates a new proxy layer and a handle for setting the engine after node launch.
125    pub fn new() -> (Self, EngineSszProxyHandle<ChainSpec, Provider, Pool, Validator>) {
126        let handle = EngineSszProxyHandle::new();
127        (Self { handle: handle.clone() }, handle)
128    }
129
130    /// Creates a new proxy layer with an Engine API implementation.
131    pub fn with_engine_api(
132        engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>,
133    ) -> (Self, EngineSszProxyHandle<ChainSpec, Provider, Pool, Validator>) {
134        let handle = EngineSszProxyHandle::with_engine_api(engine_api);
135        (Self { handle: handle.clone() }, handle)
136    }
137}
138
139impl<S, ChainSpec, Provider, Pool, Validator> Layer<S>
140    for EngineSszProxyLayer<ChainSpec, Provider, Pool, Validator>
141{
142    type Service = EngineSszProxyService<S, ChainSpec, Provider, Pool, Validator>;
143
144    fn layer(&self, inner: S) -> Self::Service {
145        EngineSszProxyService { inner, handle: self.handle.clone() }
146    }
147}
148
149/// The service produced by [`EngineSszProxyLayer`].
150#[derive(Clone, Debug)]
151pub struct EngineSszProxyService<S, ChainSpec, Provider = (), Pool = (), Validator = ()> {
152    inner: S,
153    handle: EngineSszProxyHandle<ChainSpec, Provider, Pool, Validator>,
154}
155
156impl<S, ChainSpec, Provider, Pool, Validator> Service<HttpRequest>
157    for EngineSszProxyService<S, ChainSpec, Provider, Pool, Validator>
158where
159    S: Service<HttpRequest, Response = HttpResponse, Error = BoxError> + Send + Clone,
160    S::Future: Send + 'static,
161    Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
162    Pool: TransactionPool + 'static,
163    Validator: EngineApiValidator<EthEngineTypes>,
164    ChainSpec: EthereumHardforks + Send + Sync + 'static,
165{
166    type Response = HttpResponse;
167    type Error = BoxError;
168    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
169
170    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
171        self.inner.poll_ready(cx)
172    }
173
174    fn call(&mut self, request: HttpRequest) -> Self::Future {
175        if !request.uri().path().starts_with("/engine/") {
176            let fut = self.inner.call(request);
177            return Box::pin(fut)
178        }
179
180        let handle = self.handle.clone();
181        Box::pin(async move { Ok(handle_engine_ssz_request(handle, request).await) })
182    }
183}
184
185async fn handle_engine_ssz_request<ChainSpec, Provider, Pool, Validator>(
186    handle: EngineSszProxyHandle<ChainSpec, Provider, Pool, Validator>,
187    request: HttpRequest,
188) -> HttpResponse
189where
190    Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
191    Pool: TransactionPool + 'static,
192    Validator: EngineApiValidator<EthEngineTypes>,
193    ChainSpec: EthereumHardforks + Send + Sync + 'static,
194{
195    let method = request.method().as_str().to_owned();
196    let path = request.uri().path().to_owned();
197    let Some(endpoint) = parse_engine_path(&path) else {
198        return text_response(STATUS_NOT_FOUND, "unknown engine ssz endpoint")
199    };
200
201    match endpoint {
202        EngineSszEndpoint::Capabilities => {
203            if method != "GET" {
204                return text_response(STATUS_METHOD_NOT_ALLOWED, "method not allowed")
205            }
206            handle_capabilities()
207        }
208        EngineSszEndpoint::Identity => {
209            if method != "GET" {
210                return text_response(STATUS_METHOD_NOT_ALLOWED, "method not allowed")
211            }
212            let Some(engine_api) = handle.engine_api().await else {
213                return text_response(STATUS_SERVICE_UNAVAILABLE, "engine api unavailable")
214            };
215            handle_identity(engine_api)
216        }
217        EngineSszEndpoint::Payloads(fork) => {
218            if method != "POST" {
219                return text_response(STATUS_METHOD_NOT_ALLOWED, "method not allowed")
220            }
221            let Ok(body) = request.into_body().collect().await.map(|body| body.to_bytes()) else {
222                return text_response(STATUS_BAD_REQUEST, "failed to read request body")
223            };
224            let Some(engine_api) = handle.engine_api().await else {
225                return text_response(STATUS_SERVICE_UNAVAILABLE, "engine api unavailable")
226            };
227            handle_new_payload(engine_api, fork.payloads_version(), &body).await
228        }
229        EngineSszEndpoint::Forkchoice(fork) => {
230            if method != "POST" {
231                return text_response(STATUS_METHOD_NOT_ALLOWED, "method not allowed")
232            }
233            let Ok(body) = request.into_body().collect().await.map(|body| body.to_bytes()) else {
234                return text_response(STATUS_BAD_REQUEST, "failed to read request body")
235            };
236            let Some(engine_api) = handle.engine_api().await else {
237                return text_response(STATUS_SERVICE_UNAVAILABLE, "engine api unavailable")
238            };
239            handle_forkchoice_updated(engine_api, fork.forkchoice_version(), &body).await
240        }
241        EngineSszEndpoint::Blobs(version) => {
242            if method != "POST" {
243                return text_response(STATUS_METHOD_NOT_ALLOWED, "method not allowed")
244            }
245            let Ok(body) = request.into_body().collect().await.map(|body| body.to_bytes()) else {
246                return text_response(STATUS_BAD_REQUEST, "failed to read request body")
247            };
248            let Some(engine_api) = handle.engine_api().await else {
249                return text_response(STATUS_SERVICE_UNAVAILABLE, "engine api unavailable")
250            };
251            handle_get_blobs(engine_api, version, &body).await
252        }
253    }
254}
255
256fn parse_engine_path(path: &str) -> Option<EngineSszEndpoint> {
257    let mut segments = path.trim_start_matches('/').split('/');
258    match (segments.next(), segments.next(), segments.next(), segments.next(), segments.next()) {
259        (Some("engine"), Some("v2"), Some("capabilities"), None, None) => {
260            Some(EngineSszEndpoint::Capabilities)
261        }
262        (Some("engine"), Some("v2"), Some("identity"), None, None) => {
263            Some(EngineSszEndpoint::Identity)
264        }
265        (Some("engine"), Some("v2"), Some(fork), Some("payloads"), None) => {
266            Some(EngineSszEndpoint::Payloads(fork.parse().ok()?))
267        }
268        (Some("engine"), Some("v2"), Some(fork), Some("forkchoice"), None) => {
269            Some(EngineSszEndpoint::Forkchoice(fork.parse().ok()?))
270        }
271        (Some("engine"), Some("v2"), Some("blobs"), version, None) => {
272            Some(EngineSszEndpoint::Blobs(parse_method_version(version?)?))
273        }
274        _ => None,
275    }
276}
277
278#[derive(Clone, Copy, Debug, Eq, PartialEq)]
279enum EngineSszEndpoint {
280    Capabilities,
281    Identity,
282    Payloads(EngineSszFork),
283    Forkchoice(EngineSszFork),
284    Blobs(u8),
285}
286
287#[derive(Clone, Copy, Debug, Eq, PartialEq)]
288enum EngineSszFork {
289    Paris,
290    Shanghai,
291    Cancun,
292    Prague,
293    Osaka,
294    Amsterdam,
295}
296
297impl EngineSszFork {
298    const fn payloads_version(self) -> u8 {
299        match self {
300            Self::Paris => 1,
301            Self::Shanghai => 2,
302            Self::Cancun => 3,
303            Self::Prague | Self::Osaka => 4,
304            Self::Amsterdam => 5,
305        }
306    }
307
308    const fn forkchoice_version(self) -> u8 {
309        match self {
310            Self::Paris => 1,
311            Self::Shanghai => 2,
312            Self::Cancun | Self::Prague | Self::Osaka => 3,
313            Self::Amsterdam => 4,
314        }
315    }
316}
317
318impl std::str::FromStr for EngineSszFork {
319    type Err = ();
320
321    fn from_str(value: &str) -> Result<Self, Self::Err> {
322        match value {
323            "paris" => Ok(Self::Paris),
324            "shanghai" => Ok(Self::Shanghai),
325            "cancun" => Ok(Self::Cancun),
326            "prague" => Ok(Self::Prague),
327            "osaka" => Ok(Self::Osaka),
328            "amsterdam" => Ok(Self::Amsterdam),
329            _ => Err(()),
330        }
331    }
332}
333
334fn parse_method_version(version: &str) -> Option<u8> {
335    version.strip_prefix('v')?.parse().ok().filter(|version| (1..=4).contains(version))
336}
337
338fn handle_capabilities() -> HttpResponse {
339    json_response(serde_json::json!({
340        "supported_forks": ["paris", "shanghai", "cancun", "prague", "osaka", "amsterdam"],
341        "fork_scoped_endpoints": ["payloads", "forkchoice", "bodies"],
342        "independently_versioned": {
343            "blobs": ["v1", "v2", "v3", "v4"],
344        },
345        "unscoped_endpoints": ["capabilities", "identity"],
346        "limits": {
347            "bodies.max_count": 128,
348            "blobs.max_versioned_hashes": MAX_BLOB_LIMIT,
349            "payload.max_bytes": MAX_PAYLOAD_BYTES,
350        },
351    }))
352}
353
354fn handle_identity<Provider, Pool, Validator, ChainSpec>(
355    engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>,
356) -> HttpResponse
357where
358    Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
359    Pool: TransactionPool + 'static,
360    Validator: EngineApiValidator<EthEngineTypes>,
361    ChainSpec: EthereumHardforks + Send + Sync + 'static,
362{
363    json_response(vec![engine_api.client_version().clone()])
364}
365
366async fn handle_new_payload<Provider, Pool, Validator, ChainSpec>(
367    engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>,
368    version: u8,
369    body: &[u8],
370) -> HttpResponse
371where
372    Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
373    Pool: TransactionPool + 'static,
374    Validator: EngineApiValidator<EthEngineTypes>,
375    ChainSpec: EthereumHardforks + Send + Sync + 'static,
376{
377    let payload = match decode_new_payload_request(version, body) {
378        Ok(payload) => payload,
379        Err(err) => return text_response(STATUS_BAD_REQUEST, err),
380    };
381
382    let response = match version {
383        1 => engine_api.new_payload_v1(payload).await,
384        2 => engine_api.new_payload_v2(payload).await,
385        3 => engine_api.new_payload_v3(payload).await,
386        4 => engine_api.new_payload_v4(payload).await,
387        5 => engine_api.new_payload_v5(payload).await,
388        _ => return text_response(STATUS_BAD_REQUEST, "unsupported payload endpoint version"),
389    };
390
391    match response {
392        Ok(status) => ssz_response(status),
393        Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
394    }
395}
396
397async fn handle_forkchoice_updated<Provider, Pool, Validator, ChainSpec>(
398    engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>,
399    version: u8,
400    body: &[u8],
401) -> HttpResponse
402where
403    Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
404    Pool: TransactionPool + 'static,
405    Validator: EngineApiValidator<EthEngineTypes>,
406    ChainSpec: EthereumHardforks + Send + Sync + 'static,
407{
408    let (state, attrs, custody_columns) = match decode_forkchoice_request(version, body) {
409        Ok(request) => request,
410        Err(err) => return text_response(STATUS_BAD_REQUEST, err),
411    };
412
413    let response = match version {
414        1 => engine_api.fork_choice_updated_v1_metered(state, attrs).await,
415        2 => engine_api.fork_choice_updated_v2_metered(state, attrs).await,
416        3 => engine_api.fork_choice_updated_v3_metered(state, attrs).await,
417        4 => engine_api.fork_choice_updated_v4_metered(state, attrs, custody_columns).await,
418        _ => return text_response(STATUS_BAD_REQUEST, "unsupported forkchoice endpoint version"),
419    };
420
421    match response {
422        Ok(updated) => ssz_response(updated),
423        Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
424    }
425}
426
427/// Handles SSZ `engine_getBlobsV*` requests with the node's blob store.
428async fn handle_get_blobs<ChainSpec, Provider, Pool, Validator>(
429    engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>,
430    version: u8,
431    body: &[u8],
432) -> HttpResponse
433where
434    Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
435    Pool: TransactionPool + 'static,
436    Validator: EngineApiValidator<EthEngineTypes>,
437    ChainSpec: EthereumHardforks + Send + Sync + 'static,
438{
439    match version {
440        1 => {
441            let hashes = match decode_blob_hashes_request(body) {
442                Ok(hashes) => hashes,
443                Err(err) => return text_response(STATUS_BAD_REQUEST, err),
444            };
445            match engine_api.get_blobs_v1_metered(hashes) {
446                Ok(response) => ssz_response(response),
447                Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
448            }
449        }
450        2 => {
451            let hashes = match decode_blob_hashes_request(body) {
452                Ok(hashes) => hashes,
453                Err(err) => return text_response(STATUS_BAD_REQUEST, err),
454            };
455            match engine_api.get_blobs_v2_metered(hashes) {
456                Ok(Some(response)) => ssz_response(response),
457                Ok(None) => no_content_response(),
458                Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
459            }
460        }
461        3 => {
462            let hashes = match decode_blob_hashes_request(body) {
463                Ok(hashes) => hashes,
464                Err(err) => return text_response(STATUS_BAD_REQUEST, err),
465            };
466            match engine_api.get_blobs_v3_metered(hashes) {
467                Ok(Some(response)) => ssz_response(response),
468                Ok(None) => no_content_response(),
469                Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
470            }
471        }
472        4 => {
473            let (hashes, indices_bitarray) = match decode_blob_cells_request(body) {
474                Ok(request) => request,
475                Err(err) => return text_response(STATUS_BAD_REQUEST, err),
476            };
477            match engine_api.get_blobs_v4_metered(hashes, indices_bitarray) {
478                Ok(Some(response)) => ssz_response(response),
479                Ok(None) => no_content_response(),
480                Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
481            }
482        }
483        _ => text_response(STATUS_NOT_FOUND, "unsupported blobs endpoint version"),
484    }
485}
486
487/// Decodes the common getBlobs request container with only versioned hashes.
488fn decode_blob_hashes_request(body: &[u8]) -> Result<Vec<B256>, &'static str> {
489    Vec::<B256>::from_ssz_bytes(body).map_err(|_| "invalid ssz")
490}
491
492/// Decodes the Amsterdam getBlobs request container with hashes and a cell index mask.
493fn decode_blob_cells_request(body: &[u8]) -> Result<(Vec<B256>, B128), &'static str> {
494    <(Vec<B256>, B128) as ssz::Decode>::from_ssz_bytes(body).map_err(|_| "invalid ssz")
495}
496
497fn decode_new_payload_request(version: u8, body: &[u8]) -> Result<ExecutionData, &'static str> {
498    match version {
499        1 => {
500            let execution_payload =
501                decode_one::<ExecutionPayloadV1>(body).map_err(|_| "invalid ssz")?;
502            Ok(ExecutionData::new(execution_payload.into(), ExecutionPayloadSidecar::none()))
503        }
504        2 => {
505            let execution_payload =
506                decode_one::<ExecutionPayloadV2>(body).map_err(|_| "invalid ssz")?;
507            Ok(ExecutionData::new(execution_payload.into(), ExecutionPayloadSidecar::none()))
508        }
509        3 => {
510            let (execution_payload, parent_beacon_block_root) =
511                <(ExecutionPayloadV3, B256)>::from_ssz_bytes(body).map_err(|_| "invalid ssz")?;
512            let versioned_hashes = calculate_versioned_hashes(
513                &execution_payload.payload_inner.payload_inner.transactions,
514            )?;
515            let sidecar = ExecutionPayloadSidecar::v3(CancunPayloadFields {
516                parent_beacon_block_root,
517                versioned_hashes,
518            });
519            Ok(ExecutionData::new(execution_payload.into(), sidecar))
520        }
521        4 => {
522            let (execution_payload, parent_beacon_block_root, execution_requests) =
523                <(ExecutionPayloadV3, B256, Vec<Bytes>)>::from_ssz_bytes(body)
524                    .map_err(|_| "invalid ssz")?;
525            let versioned_hashes = calculate_versioned_hashes(
526                &execution_payload.payload_inner.payload_inner.transactions,
527            )?;
528            let sidecar = ExecutionPayloadSidecar::v4(
529                CancunPayloadFields { parent_beacon_block_root, versioned_hashes },
530                PraguePayloadFields::new(RequestsOrHash::Requests(Requests::new(
531                    execution_requests,
532                ))),
533            );
534            Ok(ExecutionData::new(execution_payload.into(), sidecar))
535        }
536        5 => {
537            let (execution_payload, parent_beacon_block_root, execution_requests) =
538                <(ExecutionPayloadV4, B256, Vec<Bytes>)>::from_ssz_bytes(body)
539                    .map_err(|_| "invalid ssz")?;
540            let versioned_hashes = calculate_versioned_hashes(
541                &execution_payload.payload_inner.payload_inner.payload_inner.transactions,
542            )?;
543            let sidecar = ExecutionPayloadSidecar::v4(
544                CancunPayloadFields { parent_beacon_block_root, versioned_hashes },
545                PraguePayloadFields::new(RequestsOrHash::Requests(Requests::new(
546                    execution_requests,
547                ))),
548            );
549            Ok(ExecutionData::new(ExecutionPayload::V4(execution_payload), sidecar))
550        }
551        _ => Err("unsupported payload endpoint version"),
552    }
553}
554
555fn calculate_versioned_hashes(transactions: &[Bytes]) -> Result<Vec<B256>, &'static str> {
556    let mut versioned_hashes = Vec::new();
557    for transaction in transactions {
558        let transaction =
559            TxEnvelope::decode_2718_exact(transaction.as_ref()).map_err(|_| "invalid tx")?;
560        if let Some(hashes) = transaction.blob_versioned_hashes() {
561            versioned_hashes.extend_from_slice(hashes);
562        }
563    }
564
565    Ok(versioned_hashes)
566}
567
568fn decode_forkchoice_request(
569    version: u8,
570    body: &[u8],
571) -> Result<(ForkchoiceState, Option<PayloadAttributes>, Option<B128>), &'static str> {
572    match version {
573        1..=3 => {
574            let (forkchoice_state, payload_attributes) =
575                <(ForkchoiceState, Vec<PayloadAttributes>)>::from_ssz_bytes(body)
576                    .map_err(|_| "invalid ssz")?;
577            Ok((forkchoice_state, payload_attrs(version, payload_attributes)?, None))
578        }
579        4 => {
580            let (forkchoice_state, payload_attributes, custody_columns) =
581                <(ForkchoiceState, Vec<PayloadAttributes>, Vec<B128>)>::from_ssz_bytes(body)
582                    .map_err(|_| "invalid ssz")?;
583            Ok((
584                forkchoice_state,
585                payload_attrs(version, payload_attributes)?,
586                custody_columns_opt(custody_columns)?,
587            ))
588        }
589        _ => Err("unsupported forkchoice endpoint version"),
590    }
591}
592
593fn decode_one<T: ssz::Decode>(body: &[u8]) -> Result<T, ssz::DecodeError> {
594    let mut builder = ssz::SszDecoderBuilder::new(body);
595    builder.register_type::<T>()?;
596    let mut decoder = builder.build()?;
597    decoder.decode_next()
598}
599
600fn payload_attrs(
601    version: u8,
602    attrs: Vec<PayloadAttributes>,
603) -> Result<Option<PayloadAttributes>, &'static str> {
604    if attrs.len() > 1 {
605        return Err("payload_attributes must contain at most one value")
606    }
607
608    attrs.into_iter().next().map(|attrs| validate_payload_attrs_version(version, attrs)).transpose()
609}
610
611fn custody_columns_opt(custody_columns: Vec<B128>) -> Result<Option<B128>, &'static str> {
612    if custody_columns.len() > 1 {
613        return Err("invalid params")
614    }
615
616    Ok(custody_columns.into_iter().next())
617}
618
619fn validate_payload_attrs_version(
620    version: u8,
621    attrs: PayloadAttributes,
622) -> Result<PayloadAttributes, &'static str> {
623    let matches_version = match version {
624        1 => {
625            attrs.withdrawals.is_none() &&
626                attrs.parent_beacon_block_root.is_none() &&
627                attrs.slot_number.is_none()
628        }
629        2 => {
630            attrs.withdrawals.is_some() &&
631                attrs.parent_beacon_block_root.is_none() &&
632                attrs.slot_number.is_none()
633        }
634        3 => {
635            attrs.withdrawals.is_some() &&
636                attrs.parent_beacon_block_root.is_some() &&
637                attrs.slot_number.is_none()
638        }
639        4 => {
640            attrs.withdrawals.is_some() &&
641                attrs.parent_beacon_block_root.is_some() &&
642                attrs.slot_number.is_some()
643        }
644        _ => false,
645    };
646
647    if matches_version {
648        Ok(attrs)
649    } else {
650        Err("payload_attributes version does not match endpoint")
651    }
652}
653
654fn ssz_response<T: ssz::Encode>(value: T) -> HttpResponse {
655    HttpResponse::builder()
656        .status(STATUS_OK)
657        .header(CONTENT_TYPE, OCTET_STREAM)
658        .body(HttpBody::from(value.as_ssz_bytes()))
659        .expect("valid response")
660}
661
662fn json_response<T: serde::Serialize>(value: T) -> HttpResponse {
663    let Ok(body) = serde_json::to_string(&value) else {
664        return text_response(STATUS_INTERNAL_SERVER_ERROR, "failed to encode json")
665    };
666
667    HttpResponse::builder()
668        .status(STATUS_OK)
669        .header(CONTENT_TYPE, APPLICATION_JSON)
670        .body(HttpBody::from(body))
671        .expect("valid response")
672}
673
674fn no_content_response() -> HttpResponse {
675    HttpResponse::builder().status(204).body(HttpBody::empty()).expect("valid response")
676}
677
678fn text_response(status: u16, body: impl Into<String>) -> HttpResponse {
679    HttpResponse::builder()
680        .status(status)
681        .header(CONTENT_TYPE, TEXT_PLAIN)
682        .body(HttpBody::from(body.into()))
683        .expect("valid response")
684}
685
686#[cfg(test)]
687mod tests {
688    use super::*;
689    use ssz::Encode;
690
691    #[test]
692    fn parses_capabilities_endpoint() {
693        let endpoint = parse_engine_path("/engine/v2/capabilities").unwrap();
694        assert_eq!(endpoint, EngineSszEndpoint::Capabilities);
695    }
696
697    #[test]
698    fn parses_identity_endpoint() {
699        let endpoint = parse_engine_path("/engine/v2/identity").unwrap();
700        assert_eq!(endpoint, EngineSszEndpoint::Identity);
701    }
702
703    #[test]
704    fn parses_fork_scoped_payload_endpoint() {
705        let endpoint = parse_engine_path("/engine/v2/prague/payloads").unwrap();
706        assert_eq!(endpoint, EngineSszEndpoint::Payloads(EngineSszFork::Prague));
707    }
708
709    #[test]
710    fn parses_fork_scoped_forkchoice_endpoint() {
711        let endpoint = parse_engine_path("/engine/v2/amsterdam/forkchoice").unwrap();
712        assert_eq!(endpoint, EngineSszEndpoint::Forkchoice(EngineSszFork::Amsterdam));
713    }
714
715    #[test]
716    fn rejects_legacy_version_scoped_endpoint() {
717        assert!(parse_engine_path("/engine/v4/payloads").is_none());
718    }
719
720    #[test]
721    fn decodes_top_level_blob_hashes_request() {
722        let hashes = vec![B256::ZERO, B256::with_last_byte(1)];
723        let decoded = decode_blob_hashes_request(&hashes.as_ssz_bytes()).unwrap();
724        assert_eq!(decoded, hashes);
725    }
726
727    #[test]
728    fn decodes_forkchoice_v4_with_custody_columns() {
729        let forkchoice_state = ForkchoiceState {
730            head_block_hash: B256::ZERO,
731            safe_block_hash: B256::ZERO,
732            finalized_block_hash: B256::ZERO,
733        };
734        let encoded =
735            (forkchoice_state, Vec::<PayloadAttributes>::new(), vec![B128::with_last_byte(1)])
736                .as_ssz_bytes();
737
738        let (decoded_state, decoded_attrs, custody_columns) =
739            decode_forkchoice_request(4, &encoded).unwrap();
740        assert_eq!(decoded_state, forkchoice_state);
741        assert!(decoded_attrs.is_none());
742        assert_eq!(custody_columns, Some(B128::with_last_byte(1)));
743    }
744
745    #[test]
746    fn rejects_forkchoice_v4_with_multiple_custody_columns() {
747        let forkchoice_state = ForkchoiceState {
748            head_block_hash: B256::ZERO,
749            safe_block_hash: B256::ZERO,
750            finalized_block_hash: B256::ZERO,
751        };
752        let encoded = (
753            forkchoice_state,
754            Vec::<PayloadAttributes>::new(),
755            vec![B128::ZERO, B128::with_last_byte(1)],
756        )
757            .as_ssz_bytes();
758
759        let err = decode_forkchoice_request(4, &encoded).unwrap_err();
760        assert_eq!(err, "invalid params");
761    }
762}