1use alloy_consensus::{Transaction, TxEnvelope};
8use alloy_eips::{
9 eip2718::Decodable2718,
10 eip7685::{Requests, RequestsOrHash},
11};
12use alloy_primitives::{Bytes, B128, B256};
13use alloy_rpc_types_engine::{
14 CancunPayloadFields, ExecutionData, ExecutionPayload, ExecutionPayloadSidecar,
15 ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3, ExecutionPayloadV4,
16 ForkchoiceState, PayloadAttributes, PraguePayloadFields,
17};
18use http_body_util::BodyExt;
19use jsonrpsee::server::{HttpBody, HttpRequest, HttpResponse};
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::EngineApiValidator;
22use reth_ethereum_engine_primitives::EthEngineTypes;
23use reth_provider::{BalProvider, BlockReader, HeaderProvider, StateProviderFactory};
24use reth_rpc::EngineApi;
25use reth_transaction_pool::TransactionPool;
26use ssz::Decode;
27use std::{
28 future::Future,
29 pin::Pin,
30 sync::Arc,
31 task::{Context, Poll},
32};
33use tokio::sync::RwLock;
34use tower::{BoxError, Layer, Service};
35
36const OCTET_STREAM: &str = "application/octet-stream";
37const APPLICATION_JSON: &str = "application/json";
38const TEXT_PLAIN: &str = "text/plain";
39const CONTENT_TYPE: &str = "content-type";
40
41const STATUS_OK: u16 = 200;
42const STATUS_BAD_REQUEST: u16 = 400;
43const STATUS_NOT_FOUND: u16 = 404;
44const STATUS_METHOD_NOT_ALLOWED: u16 = 405;
45const STATUS_INTERNAL_SERVER_ERROR: u16 = 500;
46const STATUS_SERVICE_UNAVAILABLE: u16 = 503;
47
48const MAX_BLOB_LIMIT: usize = 128;
49const MAX_PAYLOAD_BYTES: u64 = 64 * 1024 * 1024;
50
51type EthEngineApi<Provider, Pool, Validator, ChainSpec> =
52 EngineApi<Provider, EthEngineTypes, Pool, Validator, ChainSpec>;
53type SharedEthEngineApi<Provider, Pool, Validator, ChainSpec> =
54 Arc<RwLock<Option<EthEngineApi<Provider, Pool, Validator, ChainSpec>>>>;
55
56pub struct EngineSszProxyHandle<ChainSpec, Provider = (), Pool = (), Validator = ()> {
58 engine_api: SharedEthEngineApi<Provider, Pool, Validator, ChainSpec>,
59}
60
61impl<C, Provider, Pool, Validator> Clone for EngineSszProxyHandle<C, Provider, Pool, Validator> {
62 fn clone(&self) -> Self {
63 Self { engine_api: self.engine_api.clone() }
64 }
65}
66
67impl<C, Provider, Pool, Validator> std::fmt::Debug
68 for EngineSszProxyHandle<C, Provider, Pool, Validator>
69{
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("EngineSszProxyHandle").finish_non_exhaustive()
72 }
73}
74
75impl<ChainSpec, Provider, Pool, Validator>
76 EngineSszProxyHandle<ChainSpec, Provider, Pool, Validator>
77{
78 fn new() -> Self {
79 Self { engine_api: Default::default() }
80 }
81
82 fn with_engine_api(engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>) -> Self {
83 Self { engine_api: Arc::new(RwLock::new(Some(engine_api))) }
84 }
85
86 pub async fn set_engine_api(
88 &self,
89 engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>,
90 ) {
91 *self.engine_api.write().await = Some(engine_api);
92 }
93
94 pub fn set_engine_api_sync(
96 &self,
97 engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>,
98 ) {
99 *self
100 .engine_api
101 .try_write()
102 .expect("engine api handle should not be locked during launch") = Some(engine_api);
103 }
104}
105
106impl<ChainSpec, Provider, Pool, Validator>
107 EngineSszProxyHandle<ChainSpec, Provider, Pool, Validator>
108{
109 pub async fn engine_api(&self) -> Option<EthEngineApi<Provider, Pool, Validator, ChainSpec>> {
111 self.engine_api.read().await.clone()
112 }
113}
114
115#[derive(Clone, Debug)]
117pub struct EngineSszProxyLayer<ChainSpec, Provider = (), Pool = (), Validator = ()> {
118 handle: EngineSszProxyHandle<ChainSpec, Provider, Pool, Validator>,
119}
120
121impl<ChainSpec, Provider, Pool, Validator>
122 EngineSszProxyLayer<ChainSpec, Provider, Pool, Validator>
123{
124 pub fn new() -> (Self, EngineSszProxyHandle<ChainSpec, Provider, Pool, Validator>) {
126 let handle = EngineSszProxyHandle::new();
127 (Self { handle: handle.clone() }, handle)
128 }
129
130 pub fn with_engine_api(
132 engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>,
133 ) -> (Self, EngineSszProxyHandle<ChainSpec, Provider, Pool, Validator>) {
134 let handle = EngineSszProxyHandle::with_engine_api(engine_api);
135 (Self { handle: handle.clone() }, handle)
136 }
137}
138
139impl<S, ChainSpec, Provider, Pool, Validator> Layer<S>
140 for EngineSszProxyLayer<ChainSpec, Provider, Pool, Validator>
141{
142 type Service = EngineSszProxyService<S, ChainSpec, Provider, Pool, Validator>;
143
144 fn layer(&self, inner: S) -> Self::Service {
145 EngineSszProxyService { inner, handle: self.handle.clone() }
146 }
147}
148
149#[derive(Clone, Debug)]
151pub struct EngineSszProxyService<S, ChainSpec, Provider = (), Pool = (), Validator = ()> {
152 inner: S,
153 handle: EngineSszProxyHandle<ChainSpec, Provider, Pool, Validator>,
154}
155
156impl<S, ChainSpec, Provider, Pool, Validator> Service<HttpRequest>
157 for EngineSszProxyService<S, ChainSpec, Provider, Pool, Validator>
158where
159 S: Service<HttpRequest, Response = HttpResponse, Error = BoxError> + Send + Clone,
160 S::Future: Send + 'static,
161 Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
162 Pool: TransactionPool + 'static,
163 Validator: EngineApiValidator<EthEngineTypes>,
164 ChainSpec: EthereumHardforks + Send + Sync + 'static,
165{
166 type Response = HttpResponse;
167 type Error = BoxError;
168 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
169
170 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
171 self.inner.poll_ready(cx)
172 }
173
174 fn call(&mut self, request: HttpRequest) -> Self::Future {
175 if !request.uri().path().starts_with("/engine/") {
176 let fut = self.inner.call(request);
177 return Box::pin(fut)
178 }
179
180 let handle = self.handle.clone();
181 Box::pin(async move { Ok(handle_engine_ssz_request(handle, request).await) })
182 }
183}
184
185async fn handle_engine_ssz_request<ChainSpec, Provider, Pool, Validator>(
186 handle: EngineSszProxyHandle<ChainSpec, Provider, Pool, Validator>,
187 request: HttpRequest,
188) -> HttpResponse
189where
190 Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
191 Pool: TransactionPool + 'static,
192 Validator: EngineApiValidator<EthEngineTypes>,
193 ChainSpec: EthereumHardforks + Send + Sync + 'static,
194{
195 let method = request.method().as_str().to_owned();
196 let path = request.uri().path().to_owned();
197 let Some(endpoint) = parse_engine_path(&path) else {
198 return text_response(STATUS_NOT_FOUND, "unknown engine ssz endpoint")
199 };
200
201 match endpoint {
202 EngineSszEndpoint::Capabilities => {
203 if method != "GET" {
204 return text_response(STATUS_METHOD_NOT_ALLOWED, "method not allowed")
205 }
206 handle_capabilities()
207 }
208 EngineSszEndpoint::Identity => {
209 if method != "GET" {
210 return text_response(STATUS_METHOD_NOT_ALLOWED, "method not allowed")
211 }
212 let Some(engine_api) = handle.engine_api().await else {
213 return text_response(STATUS_SERVICE_UNAVAILABLE, "engine api unavailable")
214 };
215 handle_identity(engine_api)
216 }
217 EngineSszEndpoint::Payloads(fork) => {
218 if method != "POST" {
219 return text_response(STATUS_METHOD_NOT_ALLOWED, "method not allowed")
220 }
221 let Ok(body) = request.into_body().collect().await.map(|body| body.to_bytes()) else {
222 return text_response(STATUS_BAD_REQUEST, "failed to read request body")
223 };
224 let Some(engine_api) = handle.engine_api().await else {
225 return text_response(STATUS_SERVICE_UNAVAILABLE, "engine api unavailable")
226 };
227 handle_new_payload(engine_api, fork.payloads_version(), &body).await
228 }
229 EngineSszEndpoint::Forkchoice(fork) => {
230 if method != "POST" {
231 return text_response(STATUS_METHOD_NOT_ALLOWED, "method not allowed")
232 }
233 let Ok(body) = request.into_body().collect().await.map(|body| body.to_bytes()) else {
234 return text_response(STATUS_BAD_REQUEST, "failed to read request body")
235 };
236 let Some(engine_api) = handle.engine_api().await else {
237 return text_response(STATUS_SERVICE_UNAVAILABLE, "engine api unavailable")
238 };
239 handle_forkchoice_updated(engine_api, fork.forkchoice_version(), &body).await
240 }
241 EngineSszEndpoint::Blobs(version) => {
242 if method != "POST" {
243 return text_response(STATUS_METHOD_NOT_ALLOWED, "method not allowed")
244 }
245 let Ok(body) = request.into_body().collect().await.map(|body| body.to_bytes()) else {
246 return text_response(STATUS_BAD_REQUEST, "failed to read request body")
247 };
248 let Some(engine_api) = handle.engine_api().await else {
249 return text_response(STATUS_SERVICE_UNAVAILABLE, "engine api unavailable")
250 };
251 handle_get_blobs(engine_api, version, &body).await
252 }
253 }
254}
255
256fn parse_engine_path(path: &str) -> Option<EngineSszEndpoint> {
257 let mut segments = path.trim_start_matches('/').split('/');
258 match (segments.next(), segments.next(), segments.next(), segments.next(), segments.next()) {
259 (Some("engine"), Some("v2"), Some("capabilities"), None, None) => {
260 Some(EngineSszEndpoint::Capabilities)
261 }
262 (Some("engine"), Some("v2"), Some("identity"), None, None) => {
263 Some(EngineSszEndpoint::Identity)
264 }
265 (Some("engine"), Some("v2"), Some(fork), Some("payloads"), None) => {
266 Some(EngineSszEndpoint::Payloads(fork.parse().ok()?))
267 }
268 (Some("engine"), Some("v2"), Some(fork), Some("forkchoice"), None) => {
269 Some(EngineSszEndpoint::Forkchoice(fork.parse().ok()?))
270 }
271 (Some("engine"), Some("v2"), Some("blobs"), version, None) => {
272 Some(EngineSszEndpoint::Blobs(parse_method_version(version?)?))
273 }
274 _ => None,
275 }
276}
277
278#[derive(Clone, Copy, Debug, Eq, PartialEq)]
279enum EngineSszEndpoint {
280 Capabilities,
281 Identity,
282 Payloads(EngineSszFork),
283 Forkchoice(EngineSszFork),
284 Blobs(u8),
285}
286
287#[derive(Clone, Copy, Debug, Eq, PartialEq)]
288enum EngineSszFork {
289 Paris,
290 Shanghai,
291 Cancun,
292 Prague,
293 Osaka,
294 Amsterdam,
295}
296
297impl EngineSszFork {
298 const fn payloads_version(self) -> u8 {
299 match self {
300 Self::Paris => 1,
301 Self::Shanghai => 2,
302 Self::Cancun => 3,
303 Self::Prague | Self::Osaka => 4,
304 Self::Amsterdam => 5,
305 }
306 }
307
308 const fn forkchoice_version(self) -> u8 {
309 match self {
310 Self::Paris => 1,
311 Self::Shanghai => 2,
312 Self::Cancun | Self::Prague | Self::Osaka => 3,
313 Self::Amsterdam => 4,
314 }
315 }
316}
317
318impl std::str::FromStr for EngineSszFork {
319 type Err = ();
320
321 fn from_str(value: &str) -> Result<Self, Self::Err> {
322 match value {
323 "paris" => Ok(Self::Paris),
324 "shanghai" => Ok(Self::Shanghai),
325 "cancun" => Ok(Self::Cancun),
326 "prague" => Ok(Self::Prague),
327 "osaka" => Ok(Self::Osaka),
328 "amsterdam" => Ok(Self::Amsterdam),
329 _ => Err(()),
330 }
331 }
332}
333
334fn parse_method_version(version: &str) -> Option<u8> {
335 version.strip_prefix('v')?.parse().ok().filter(|version| (1..=4).contains(version))
336}
337
338fn handle_capabilities() -> HttpResponse {
339 json_response(serde_json::json!({
340 "supported_forks": ["paris", "shanghai", "cancun", "prague", "osaka", "amsterdam"],
341 "fork_scoped_endpoints": ["payloads", "forkchoice", "bodies"],
342 "independently_versioned": {
343 "blobs": ["v1", "v2", "v3", "v4"],
344 },
345 "unscoped_endpoints": ["capabilities", "identity"],
346 "limits": {
347 "bodies.max_count": 128,
348 "blobs.max_versioned_hashes": MAX_BLOB_LIMIT,
349 "payload.max_bytes": MAX_PAYLOAD_BYTES,
350 },
351 }))
352}
353
354fn handle_identity<Provider, Pool, Validator, ChainSpec>(
355 engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>,
356) -> HttpResponse
357where
358 Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
359 Pool: TransactionPool + 'static,
360 Validator: EngineApiValidator<EthEngineTypes>,
361 ChainSpec: EthereumHardforks + Send + Sync + 'static,
362{
363 json_response(vec![engine_api.client_version().clone()])
364}
365
366async fn handle_new_payload<Provider, Pool, Validator, ChainSpec>(
367 engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>,
368 version: u8,
369 body: &[u8],
370) -> HttpResponse
371where
372 Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
373 Pool: TransactionPool + 'static,
374 Validator: EngineApiValidator<EthEngineTypes>,
375 ChainSpec: EthereumHardforks + Send + Sync + 'static,
376{
377 let payload = match decode_new_payload_request(version, body) {
378 Ok(payload) => payload,
379 Err(err) => return text_response(STATUS_BAD_REQUEST, err),
380 };
381
382 let response = match version {
383 1 => engine_api.new_payload_v1(payload).await,
384 2 => engine_api.new_payload_v2(payload).await,
385 3 => engine_api.new_payload_v3(payload).await,
386 4 => engine_api.new_payload_v4(payload).await,
387 5 => engine_api.new_payload_v5(payload).await,
388 _ => return text_response(STATUS_BAD_REQUEST, "unsupported payload endpoint version"),
389 };
390
391 match response {
392 Ok(status) => ssz_response(status),
393 Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
394 }
395}
396
397async fn handle_forkchoice_updated<Provider, Pool, Validator, ChainSpec>(
398 engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>,
399 version: u8,
400 body: &[u8],
401) -> HttpResponse
402where
403 Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
404 Pool: TransactionPool + 'static,
405 Validator: EngineApiValidator<EthEngineTypes>,
406 ChainSpec: EthereumHardforks + Send + Sync + 'static,
407{
408 let (state, attrs, custody_columns) = match decode_forkchoice_request(version, body) {
409 Ok(request) => request,
410 Err(err) => return text_response(STATUS_BAD_REQUEST, err),
411 };
412
413 let response = match version {
414 1 => engine_api.fork_choice_updated_v1_metered(state, attrs).await,
415 2 => engine_api.fork_choice_updated_v2_metered(state, attrs).await,
416 3 => engine_api.fork_choice_updated_v3_metered(state, attrs).await,
417 4 => engine_api.fork_choice_updated_v4_metered(state, attrs, custody_columns).await,
418 _ => return text_response(STATUS_BAD_REQUEST, "unsupported forkchoice endpoint version"),
419 };
420
421 match response {
422 Ok(updated) => ssz_response(updated),
423 Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
424 }
425}
426
427async fn handle_get_blobs<ChainSpec, Provider, Pool, Validator>(
429 engine_api: EthEngineApi<Provider, Pool, Validator, ChainSpec>,
430 version: u8,
431 body: &[u8],
432) -> HttpResponse
433where
434 Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
435 Pool: TransactionPool + 'static,
436 Validator: EngineApiValidator<EthEngineTypes>,
437 ChainSpec: EthereumHardforks + Send + Sync + 'static,
438{
439 match version {
440 1 => {
441 let hashes = match decode_blob_hashes_request(body) {
442 Ok(hashes) => hashes,
443 Err(err) => return text_response(STATUS_BAD_REQUEST, err),
444 };
445 match engine_api.get_blobs_v1_metered(hashes) {
446 Ok(response) => ssz_response(response),
447 Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
448 }
449 }
450 2 => {
451 let hashes = match decode_blob_hashes_request(body) {
452 Ok(hashes) => hashes,
453 Err(err) => return text_response(STATUS_BAD_REQUEST, err),
454 };
455 match engine_api.get_blobs_v2_metered(hashes) {
456 Ok(Some(response)) => ssz_response(response),
457 Ok(None) => no_content_response(),
458 Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
459 }
460 }
461 3 => {
462 let hashes = match decode_blob_hashes_request(body) {
463 Ok(hashes) => hashes,
464 Err(err) => return text_response(STATUS_BAD_REQUEST, err),
465 };
466 match engine_api.get_blobs_v3_metered(hashes) {
467 Ok(Some(response)) => ssz_response(response),
468 Ok(None) => no_content_response(),
469 Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
470 }
471 }
472 4 => {
473 let (hashes, indices_bitarray) = match decode_blob_cells_request(body) {
474 Ok(request) => request,
475 Err(err) => return text_response(STATUS_BAD_REQUEST, err),
476 };
477 match engine_api.get_blobs_v4_metered(hashes, indices_bitarray) {
478 Ok(Some(response)) => ssz_response(response),
479 Ok(None) => no_content_response(),
480 Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
481 }
482 }
483 _ => text_response(STATUS_NOT_FOUND, "unsupported blobs endpoint version"),
484 }
485}
486
487fn decode_blob_hashes_request(body: &[u8]) -> Result<Vec<B256>, &'static str> {
489 Vec::<B256>::from_ssz_bytes(body).map_err(|_| "invalid ssz")
490}
491
492fn decode_blob_cells_request(body: &[u8]) -> Result<(Vec<B256>, B128), &'static str> {
494 <(Vec<B256>, B128) as ssz::Decode>::from_ssz_bytes(body).map_err(|_| "invalid ssz")
495}
496
497fn decode_new_payload_request(version: u8, body: &[u8]) -> Result<ExecutionData, &'static str> {
498 match version {
499 1 => {
500 let execution_payload =
501 decode_one::<ExecutionPayloadV1>(body).map_err(|_| "invalid ssz")?;
502 Ok(ExecutionData::new(execution_payload.into(), ExecutionPayloadSidecar::none()))
503 }
504 2 => {
505 let execution_payload =
506 decode_one::<ExecutionPayloadV2>(body).map_err(|_| "invalid ssz")?;
507 Ok(ExecutionData::new(execution_payload.into(), ExecutionPayloadSidecar::none()))
508 }
509 3 => {
510 let (execution_payload, parent_beacon_block_root) =
511 <(ExecutionPayloadV3, B256)>::from_ssz_bytes(body).map_err(|_| "invalid ssz")?;
512 let versioned_hashes = calculate_versioned_hashes(
513 &execution_payload.payload_inner.payload_inner.transactions,
514 )?;
515 let sidecar = ExecutionPayloadSidecar::v3(CancunPayloadFields {
516 parent_beacon_block_root,
517 versioned_hashes,
518 });
519 Ok(ExecutionData::new(execution_payload.into(), sidecar))
520 }
521 4 => {
522 let (execution_payload, parent_beacon_block_root, execution_requests) =
523 <(ExecutionPayloadV3, B256, Vec<Bytes>)>::from_ssz_bytes(body)
524 .map_err(|_| "invalid ssz")?;
525 let versioned_hashes = calculate_versioned_hashes(
526 &execution_payload.payload_inner.payload_inner.transactions,
527 )?;
528 let sidecar = ExecutionPayloadSidecar::v4(
529 CancunPayloadFields { parent_beacon_block_root, versioned_hashes },
530 PraguePayloadFields::new(RequestsOrHash::Requests(Requests::new(
531 execution_requests,
532 ))),
533 );
534 Ok(ExecutionData::new(execution_payload.into(), sidecar))
535 }
536 5 => {
537 let (execution_payload, parent_beacon_block_root, execution_requests) =
538 <(ExecutionPayloadV4, B256, Vec<Bytes>)>::from_ssz_bytes(body)
539 .map_err(|_| "invalid ssz")?;
540 let versioned_hashes = calculate_versioned_hashes(
541 &execution_payload.payload_inner.payload_inner.payload_inner.transactions,
542 )?;
543 let sidecar = ExecutionPayloadSidecar::v4(
544 CancunPayloadFields { parent_beacon_block_root, versioned_hashes },
545 PraguePayloadFields::new(RequestsOrHash::Requests(Requests::new(
546 execution_requests,
547 ))),
548 );
549 Ok(ExecutionData::new(ExecutionPayload::V4(execution_payload), sidecar))
550 }
551 _ => Err("unsupported payload endpoint version"),
552 }
553}
554
555fn calculate_versioned_hashes(transactions: &[Bytes]) -> Result<Vec<B256>, &'static str> {
556 let mut versioned_hashes = Vec::new();
557 for transaction in transactions {
558 let transaction =
559 TxEnvelope::decode_2718_exact(transaction.as_ref()).map_err(|_| "invalid tx")?;
560 if let Some(hashes) = transaction.blob_versioned_hashes() {
561 versioned_hashes.extend_from_slice(hashes);
562 }
563 }
564
565 Ok(versioned_hashes)
566}
567
568fn decode_forkchoice_request(
569 version: u8,
570 body: &[u8],
571) -> Result<(ForkchoiceState, Option<PayloadAttributes>, Option<B128>), &'static str> {
572 match version {
573 1..=3 => {
574 let (forkchoice_state, payload_attributes) =
575 <(ForkchoiceState, Vec<PayloadAttributes>)>::from_ssz_bytes(body)
576 .map_err(|_| "invalid ssz")?;
577 Ok((forkchoice_state, payload_attrs(version, payload_attributes)?, None))
578 }
579 4 => {
580 let (forkchoice_state, payload_attributes, custody_columns) =
581 <(ForkchoiceState, Vec<PayloadAttributes>, Vec<B128>)>::from_ssz_bytes(body)
582 .map_err(|_| "invalid ssz")?;
583 Ok((
584 forkchoice_state,
585 payload_attrs(version, payload_attributes)?,
586 custody_columns_opt(custody_columns)?,
587 ))
588 }
589 _ => Err("unsupported forkchoice endpoint version"),
590 }
591}
592
593fn decode_one<T: ssz::Decode>(body: &[u8]) -> Result<T, ssz::DecodeError> {
594 let mut builder = ssz::SszDecoderBuilder::new(body);
595 builder.register_type::<T>()?;
596 let mut decoder = builder.build()?;
597 decoder.decode_next()
598}
599
600fn payload_attrs(
601 version: u8,
602 attrs: Vec<PayloadAttributes>,
603) -> Result<Option<PayloadAttributes>, &'static str> {
604 if attrs.len() > 1 {
605 return Err("payload_attributes must contain at most one value")
606 }
607
608 attrs.into_iter().next().map(|attrs| validate_payload_attrs_version(version, attrs)).transpose()
609}
610
611fn custody_columns_opt(custody_columns: Vec<B128>) -> Result<Option<B128>, &'static str> {
612 if custody_columns.len() > 1 {
613 return Err("invalid params")
614 }
615
616 Ok(custody_columns.into_iter().next())
617}
618
619fn validate_payload_attrs_version(
620 version: u8,
621 attrs: PayloadAttributes,
622) -> Result<PayloadAttributes, &'static str> {
623 let matches_version = match version {
624 1 => {
625 attrs.withdrawals.is_none() &&
626 attrs.parent_beacon_block_root.is_none() &&
627 attrs.slot_number.is_none()
628 }
629 2 => {
630 attrs.withdrawals.is_some() &&
631 attrs.parent_beacon_block_root.is_none() &&
632 attrs.slot_number.is_none()
633 }
634 3 => {
635 attrs.withdrawals.is_some() &&
636 attrs.parent_beacon_block_root.is_some() &&
637 attrs.slot_number.is_none()
638 }
639 4 => {
640 attrs.withdrawals.is_some() &&
641 attrs.parent_beacon_block_root.is_some() &&
642 attrs.slot_number.is_some()
643 }
644 _ => false,
645 };
646
647 if matches_version {
648 Ok(attrs)
649 } else {
650 Err("payload_attributes version does not match endpoint")
651 }
652}
653
654fn ssz_response<T: ssz::Encode>(value: T) -> HttpResponse {
655 HttpResponse::builder()
656 .status(STATUS_OK)
657 .header(CONTENT_TYPE, OCTET_STREAM)
658 .body(HttpBody::from(value.as_ssz_bytes()))
659 .expect("valid response")
660}
661
662fn json_response<T: serde::Serialize>(value: T) -> HttpResponse {
663 let Ok(body) = serde_json::to_string(&value) else {
664 return text_response(STATUS_INTERNAL_SERVER_ERROR, "failed to encode json")
665 };
666
667 HttpResponse::builder()
668 .status(STATUS_OK)
669 .header(CONTENT_TYPE, APPLICATION_JSON)
670 .body(HttpBody::from(body))
671 .expect("valid response")
672}
673
674fn no_content_response() -> HttpResponse {
675 HttpResponse::builder().status(204).body(HttpBody::empty()).expect("valid response")
676}
677
678fn text_response(status: u16, body: impl Into<String>) -> HttpResponse {
679 HttpResponse::builder()
680 .status(status)
681 .header(CONTENT_TYPE, TEXT_PLAIN)
682 .body(HttpBody::from(body.into()))
683 .expect("valid response")
684}
685
686#[cfg(test)]
687mod tests {
688 use super::*;
689 use ssz::Encode;
690
691 #[test]
692 fn parses_capabilities_endpoint() {
693 let endpoint = parse_engine_path("/engine/v2/capabilities").unwrap();
694 assert_eq!(endpoint, EngineSszEndpoint::Capabilities);
695 }
696
697 #[test]
698 fn parses_identity_endpoint() {
699 let endpoint = parse_engine_path("/engine/v2/identity").unwrap();
700 assert_eq!(endpoint, EngineSszEndpoint::Identity);
701 }
702
703 #[test]
704 fn parses_fork_scoped_payload_endpoint() {
705 let endpoint = parse_engine_path("/engine/v2/prague/payloads").unwrap();
706 assert_eq!(endpoint, EngineSszEndpoint::Payloads(EngineSszFork::Prague));
707 }
708
709 #[test]
710 fn parses_fork_scoped_forkchoice_endpoint() {
711 let endpoint = parse_engine_path("/engine/v2/amsterdam/forkchoice").unwrap();
712 assert_eq!(endpoint, EngineSszEndpoint::Forkchoice(EngineSszFork::Amsterdam));
713 }
714
715 #[test]
716 fn rejects_legacy_version_scoped_endpoint() {
717 assert!(parse_engine_path("/engine/v4/payloads").is_none());
718 }
719
720 #[test]
721 fn decodes_top_level_blob_hashes_request() {
722 let hashes = vec![B256::ZERO, B256::with_last_byte(1)];
723 let decoded = decode_blob_hashes_request(&hashes.as_ssz_bytes()).unwrap();
724 assert_eq!(decoded, hashes);
725 }
726
727 #[test]
728 fn decodes_forkchoice_v4_with_custody_columns() {
729 let forkchoice_state = ForkchoiceState {
730 head_block_hash: B256::ZERO,
731 safe_block_hash: B256::ZERO,
732 finalized_block_hash: B256::ZERO,
733 };
734 let encoded =
735 (forkchoice_state, Vec::<PayloadAttributes>::new(), vec![B128::with_last_byte(1)])
736 .as_ssz_bytes();
737
738 let (decoded_state, decoded_attrs, custody_columns) =
739 decode_forkchoice_request(4, &encoded).unwrap();
740 assert_eq!(decoded_state, forkchoice_state);
741 assert!(decoded_attrs.is_none());
742 assert_eq!(custody_columns, Some(B128::with_last_byte(1)));
743 }
744
745 #[test]
746 fn rejects_forkchoice_v4_with_multiple_custody_columns() {
747 let forkchoice_state = ForkchoiceState {
748 head_block_hash: B256::ZERO,
749 safe_block_hash: B256::ZERO,
750 finalized_block_hash: B256::ZERO,
751 };
752 let encoded = (
753 forkchoice_state,
754 Vec::<PayloadAttributes>::new(),
755 vec![B128::ZERO, B128::with_last_byte(1)],
756 )
757 .as_ssz_bytes();
758
759 let err = decode_forkchoice_request(4, &encoded).unwrap_err();
760 assert_eq!(err, "invalid params");
761 }
762}