Skip to main content

reth_node_ethereum/
engine_ssz_proxy.rs

1//! HTTP SSZ transport proxy for the authenticated Engine API server.
2//!
3//! Implements the [EIP-8178] SSZ Engine API routes under `/engine`.
4//!
5//! [EIP-8178]: https://eips.ethereum.org/EIPS/eip-8178
6
7use alloy_eips::eip7685::{Requests, RequestsOrHash};
8use alloy_primitives::{Bytes, B256};
9use alloy_rpc_types_engine::{
10    CancunPayloadFields, ExecutionData, ExecutionPayload, ExecutionPayloadSidecar,
11    ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3, ExecutionPayloadV4,
12    ForkchoiceState, PayloadAttributes, PraguePayloadFields,
13};
14use http_body_util::BodyExt;
15use jsonrpsee::server::{HttpBody, HttpRequest, HttpResponse};
16use reth_engine_primitives::ConsensusEngineHandle;
17use reth_ethereum_engine_primitives::EthEngineTypes;
18use std::{
19    future::Future,
20    pin::Pin,
21    sync::Arc,
22    task::{Context, Poll},
23};
24use tokio::sync::RwLock;
25use tower::{BoxError, Layer, Service};
26
27const OCTET_STREAM: &str = "application/octet-stream";
28const TEXT_PLAIN: &str = "text/plain";
29const CONTENT_TYPE: &str = "content-type";
30
31const STATUS_OK: u16 = 200;
32const STATUS_BAD_REQUEST: u16 = 400;
33const STATUS_NOT_FOUND: u16 = 404;
34const STATUS_METHOD_NOT_ALLOWED: u16 = 405;
35const STATUS_INTERNAL_SERVER_ERROR: u16 = 500;
36const STATUS_SERVICE_UNAVAILABLE: u16 = 503;
37
38/// Shared handle used by [`EngineSszProxyLayer`].
39#[derive(Clone, Debug, Default)]
40pub struct EngineSszProxyHandle {
41    engine: Arc<RwLock<Option<ConsensusEngineHandle<EthEngineTypes>>>>,
42}
43
44impl EngineSszProxyHandle {
45    /// Sets the consensus engine handle used by the proxy.
46    pub async fn set_engine(&self, engine: ConsensusEngineHandle<EthEngineTypes>) {
47        *self.engine.write().await = Some(engine);
48    }
49
50    async fn engine(&self) -> Option<ConsensusEngineHandle<EthEngineTypes>> {
51        self.engine.read().await.clone()
52    }
53}
54
55/// A tower layer that intercepts SSZ Engine API routes under `/engine`.
56#[derive(Clone, Debug, Default)]
57pub struct EngineSszProxyLayer {
58    handle: EngineSszProxyHandle,
59}
60
61impl EngineSszProxyLayer {
62    /// Creates a new proxy layer and a handle for setting the engine after node launch.
63    pub fn new() -> (Self, EngineSszProxyHandle) {
64        let handle = EngineSszProxyHandle::default();
65        (Self { handle: handle.clone() }, handle)
66    }
67}
68
69impl<S> Layer<S> for EngineSszProxyLayer {
70    type Service = EngineSszProxyService<S>;
71
72    fn layer(&self, inner: S) -> Self::Service {
73        EngineSszProxyService { inner, handle: self.handle.clone() }
74    }
75}
76
77/// The service produced by [`EngineSszProxyLayer`].
78#[derive(Clone, Debug)]
79pub struct EngineSszProxyService<S> {
80    inner: S,
81    handle: EngineSszProxyHandle,
82}
83
84impl<S> Service<HttpRequest> for EngineSszProxyService<S>
85where
86    S: Service<HttpRequest, Response = HttpResponse, Error = BoxError> + Send + Clone,
87    S::Future: Send + 'static,
88{
89    type Response = HttpResponse;
90    type Error = BoxError;
91    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
92
93    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
94        self.inner.poll_ready(cx)
95    }
96
97    fn call(&mut self, request: HttpRequest) -> Self::Future {
98        if !request.uri().path().starts_with("/engine/") {
99            let fut = self.inner.call(request);
100            return Box::pin(fut)
101        }
102
103        let handle = self.handle.clone();
104        Box::pin(async move { Ok(handle_engine_ssz_request(handle, request).await) })
105    }
106}
107
108async fn handle_engine_ssz_request(
109    handle: EngineSszProxyHandle,
110    request: HttpRequest,
111) -> HttpResponse {
112    if request.method().as_str() != "POST" {
113        return text_response(STATUS_METHOD_NOT_ALLOWED, "method not allowed")
114    }
115
116    let path = request.uri().path().to_owned();
117    let Some((version, resource)) = parse_engine_path(&path) else {
118        return text_response(STATUS_NOT_FOUND, "unknown engine ssz endpoint")
119    };
120
121    let Ok(body) = request.into_body().collect().await.map(|body| body.to_bytes()) else {
122        return text_response(STATUS_BAD_REQUEST, "failed to read request body")
123    };
124
125    let Some(engine) = handle.engine().await else {
126        return text_response(STATUS_SERVICE_UNAVAILABLE, "engine handle unavailable")
127    };
128
129    match resource {
130        "payloads" => handle_new_payload(engine, version, &body).await,
131        "forkchoice" => handle_forkchoice_updated(engine, version, &body).await,
132        _ => text_response(STATUS_NOT_FOUND, "unknown engine ssz endpoint"),
133    }
134}
135
136fn parse_engine_path(path: &str) -> Option<(u8, &str)> {
137    let mut segments = path.trim_start_matches('/').split('/');
138    match (segments.next(), segments.next(), segments.next(), segments.next()) {
139        (Some("engine"), Some(version), Some(resource), None) => {
140            let version = version.strip_prefix('v')?.parse().ok()?;
141            Some((version, resource))
142        }
143        _ => None,
144    }
145}
146
147async fn handle_new_payload(
148    engine: ConsensusEngineHandle<EthEngineTypes>,
149    version: u8,
150    body: &[u8],
151) -> HttpResponse {
152    let payload = match decode_new_payload_request(version, body) {
153        Ok(payload) => payload,
154        Err(err) => return text_response(STATUS_BAD_REQUEST, err),
155    };
156
157    match engine.new_payload(payload).await {
158        Ok(status) => ssz_response(status),
159        Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
160    }
161}
162
163async fn handle_forkchoice_updated(
164    engine: ConsensusEngineHandle<EthEngineTypes>,
165    version: u8,
166    body: &[u8],
167) -> HttpResponse {
168    let (state, attrs) = match decode_forkchoice_request(version, body) {
169        Ok(request) => request,
170        Err(err) => return text_response(STATUS_BAD_REQUEST, err),
171    };
172
173    match engine.fork_choice_updated(state, attrs).await {
174        Ok(updated) => ssz_response(updated),
175        Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
176    }
177}
178
179fn decode_new_payload_request(version: u8, body: &[u8]) -> Result<ExecutionData, &'static str> {
180    use ssz::Decode;
181
182    match version {
183        1 => {
184            let execution_payload =
185                decode_one::<ExecutionPayloadV1>(body).map_err(|_| "invalid ssz")?;
186            Ok(ExecutionData::new(execution_payload.into(), ExecutionPayloadSidecar::none()))
187        }
188        2 => {
189            let execution_payload =
190                decode_one::<ExecutionPayloadV2>(body).map_err(|_| "invalid ssz")?;
191            Ok(ExecutionData::new(execution_payload.into(), ExecutionPayloadSidecar::none()))
192        }
193        3 => {
194            let (execution_payload, expected_blob_versioned_hashes, parent_beacon_block_root) =
195                <(ExecutionPayloadV3, Vec<B256>, B256)>::from_ssz_bytes(body)
196                    .map_err(|_| "invalid ssz")?;
197            let sidecar = ExecutionPayloadSidecar::v3(CancunPayloadFields {
198                parent_beacon_block_root,
199                versioned_hashes: expected_blob_versioned_hashes,
200            });
201            Ok(ExecutionData::new(execution_payload.into(), sidecar))
202        }
203        4 => {
204            let (
205                execution_payload,
206                expected_blob_versioned_hashes,
207                parent_beacon_block_root,
208                execution_requests,
209            ) = <(ExecutionPayloadV3, Vec<B256>, B256, Vec<Bytes>)>::from_ssz_bytes(body)
210                .map_err(|_| "invalid ssz")?;
211            let sidecar = ExecutionPayloadSidecar::v4(
212                CancunPayloadFields {
213                    parent_beacon_block_root,
214                    versioned_hashes: expected_blob_versioned_hashes,
215                },
216                PraguePayloadFields::new(RequestsOrHash::Requests(Requests::new(
217                    execution_requests,
218                ))),
219            );
220            Ok(ExecutionData::new(execution_payload.into(), sidecar))
221        }
222        5 => {
223            let (
224                execution_payload,
225                expected_blob_versioned_hashes,
226                parent_beacon_block_root,
227                execution_requests,
228            ) = <(ExecutionPayloadV4, Vec<B256>, B256, Vec<Bytes>)>::from_ssz_bytes(body)
229                .map_err(|_| "invalid ssz")?;
230            let sidecar = ExecutionPayloadSidecar::v4(
231                CancunPayloadFields {
232                    parent_beacon_block_root,
233                    versioned_hashes: expected_blob_versioned_hashes,
234                },
235                PraguePayloadFields::new(RequestsOrHash::Requests(Requests::new(
236                    execution_requests,
237                ))),
238            );
239            Ok(ExecutionData::new(ExecutionPayload::V4(execution_payload), sidecar))
240        }
241        _ => Err("unsupported payload endpoint version"),
242    }
243}
244
245fn decode_forkchoice_request(
246    version: u8,
247    body: &[u8],
248) -> Result<(ForkchoiceState, Option<PayloadAttributes>), &'static str> {
249    use ssz::Decode;
250
251    match version {
252        1..=4 => {
253            let (forkchoice_state, payload_attributes) =
254                <(ForkchoiceState, Vec<PayloadAttributes>)>::from_ssz_bytes(body)
255                    .map_err(|_| "invalid ssz")?;
256            Ok((forkchoice_state, payload_attrs(version, payload_attributes)?))
257        }
258        _ => Err("unsupported forkchoice endpoint version"),
259    }
260}
261
262fn decode_one<T: ssz::Decode>(body: &[u8]) -> Result<T, ssz::DecodeError> {
263    let mut builder = ssz::SszDecoderBuilder::new(body);
264    builder.register_type::<T>()?;
265    let mut decoder = builder.build()?;
266    decoder.decode_next()
267}
268
269fn payload_attrs(
270    version: u8,
271    attrs: Vec<PayloadAttributes>,
272) -> Result<Option<PayloadAttributes>, &'static str> {
273    if attrs.len() > 1 {
274        return Err("payload_attributes must contain at most one value")
275    }
276
277    attrs.into_iter().next().map(|attrs| validate_payload_attrs_version(version, attrs)).transpose()
278}
279
280fn validate_payload_attrs_version(
281    version: u8,
282    attrs: PayloadAttributes,
283) -> Result<PayloadAttributes, &'static str> {
284    let matches_version = match version {
285        1 => {
286            attrs.withdrawals.is_none() &&
287                attrs.parent_beacon_block_root.is_none() &&
288                attrs.slot_number.is_none()
289        }
290        2 => {
291            attrs.withdrawals.is_some() &&
292                attrs.parent_beacon_block_root.is_none() &&
293                attrs.slot_number.is_none()
294        }
295        3 => {
296            attrs.withdrawals.is_some() &&
297                attrs.parent_beacon_block_root.is_some() &&
298                attrs.slot_number.is_none()
299        }
300        4 => {
301            attrs.withdrawals.is_some() &&
302                attrs.parent_beacon_block_root.is_some() &&
303                attrs.slot_number.is_some()
304        }
305        _ => false,
306    };
307
308    if matches_version {
309        Ok(attrs)
310    } else {
311        Err("payload_attributes version does not match endpoint")
312    }
313}
314
315fn ssz_response<T: ssz::Encode>(value: T) -> HttpResponse {
316    HttpResponse::builder()
317        .status(STATUS_OK)
318        .header(CONTENT_TYPE, OCTET_STREAM)
319        .body(HttpBody::from(value.as_ssz_bytes()))
320        .expect("valid response")
321}
322
323fn text_response(status: u16, body: impl Into<String>) -> HttpResponse {
324    HttpResponse::builder()
325        .status(status)
326        .header(CONTENT_TYPE, TEXT_PLAIN)
327        .body(HttpBody::from(body.into()))
328        .expect("valid response")
329}