1use alloy_eips::eip7685::{Requests, RequestsOrHash};
8use alloy_primitives::{Bytes, B256};
9use alloy_rpc_types_engine::{
10 CancunPayloadFields, ExecutionData, ExecutionPayload, ExecutionPayloadSidecar,
11 ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3, ExecutionPayloadV4,
12 ForkchoiceState, PayloadAttributes, PraguePayloadFields,
13};
14use http_body_util::BodyExt;
15use jsonrpsee::server::{HttpBody, HttpRequest, HttpResponse};
16use reth_engine_primitives::ConsensusEngineHandle;
17use reth_ethereum_engine_primitives::EthEngineTypes;
18use std::{
19 future::Future,
20 pin::Pin,
21 sync::Arc,
22 task::{Context, Poll},
23};
24use tokio::sync::RwLock;
25use tower::{BoxError, Layer, Service};
26
27const OCTET_STREAM: &str = "application/octet-stream";
28const TEXT_PLAIN: &str = "text/plain";
29const CONTENT_TYPE: &str = "content-type";
30
31const STATUS_OK: u16 = 200;
32const STATUS_BAD_REQUEST: u16 = 400;
33const STATUS_NOT_FOUND: u16 = 404;
34const STATUS_METHOD_NOT_ALLOWED: u16 = 405;
35const STATUS_INTERNAL_SERVER_ERROR: u16 = 500;
36const STATUS_SERVICE_UNAVAILABLE: u16 = 503;
37
38#[derive(Clone, Debug, Default)]
40pub struct EngineSszProxyHandle {
41 engine: Arc<RwLock<Option<ConsensusEngineHandle<EthEngineTypes>>>>,
42}
43
44impl EngineSszProxyHandle {
45 pub async fn set_engine(&self, engine: ConsensusEngineHandle<EthEngineTypes>) {
47 *self.engine.write().await = Some(engine);
48 }
49
50 async fn engine(&self) -> Option<ConsensusEngineHandle<EthEngineTypes>> {
51 self.engine.read().await.clone()
52 }
53}
54
55#[derive(Clone, Debug, Default)]
57pub struct EngineSszProxyLayer {
58 handle: EngineSszProxyHandle,
59}
60
61impl EngineSszProxyLayer {
62 pub fn new() -> (Self, EngineSszProxyHandle) {
64 let handle = EngineSszProxyHandle::default();
65 (Self { handle: handle.clone() }, handle)
66 }
67}
68
69impl<S> Layer<S> for EngineSszProxyLayer {
70 type Service = EngineSszProxyService<S>;
71
72 fn layer(&self, inner: S) -> Self::Service {
73 EngineSszProxyService { inner, handle: self.handle.clone() }
74 }
75}
76
77#[derive(Clone, Debug)]
79pub struct EngineSszProxyService<S> {
80 inner: S,
81 handle: EngineSszProxyHandle,
82}
83
84impl<S> Service<HttpRequest> for EngineSszProxyService<S>
85where
86 S: Service<HttpRequest, Response = HttpResponse, Error = BoxError> + Send + Clone,
87 S::Future: Send + 'static,
88{
89 type Response = HttpResponse;
90 type Error = BoxError;
91 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
92
93 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
94 self.inner.poll_ready(cx)
95 }
96
97 fn call(&mut self, request: HttpRequest) -> Self::Future {
98 if !request.uri().path().starts_with("/engine/") {
99 let fut = self.inner.call(request);
100 return Box::pin(fut)
101 }
102
103 let handle = self.handle.clone();
104 Box::pin(async move { Ok(handle_engine_ssz_request(handle, request).await) })
105 }
106}
107
108async fn handle_engine_ssz_request(
109 handle: EngineSszProxyHandle,
110 request: HttpRequest,
111) -> HttpResponse {
112 if request.method().as_str() != "POST" {
113 return text_response(STATUS_METHOD_NOT_ALLOWED, "method not allowed")
114 }
115
116 let path = request.uri().path().to_owned();
117 let Some((version, resource)) = parse_engine_path(&path) else {
118 return text_response(STATUS_NOT_FOUND, "unknown engine ssz endpoint")
119 };
120
121 let Ok(body) = request.into_body().collect().await.map(|body| body.to_bytes()) else {
122 return text_response(STATUS_BAD_REQUEST, "failed to read request body")
123 };
124
125 let Some(engine) = handle.engine().await else {
126 return text_response(STATUS_SERVICE_UNAVAILABLE, "engine handle unavailable")
127 };
128
129 match resource {
130 "payloads" => handle_new_payload(engine, version, &body).await,
131 "forkchoice" => handle_forkchoice_updated(engine, version, &body).await,
132 _ => text_response(STATUS_NOT_FOUND, "unknown engine ssz endpoint"),
133 }
134}
135
136fn parse_engine_path(path: &str) -> Option<(u8, &str)> {
137 let mut segments = path.trim_start_matches('/').split('/');
138 match (segments.next(), segments.next(), segments.next(), segments.next()) {
139 (Some("engine"), Some(version), Some(resource), None) => {
140 let version = version.strip_prefix('v')?.parse().ok()?;
141 Some((version, resource))
142 }
143 _ => None,
144 }
145}
146
147async fn handle_new_payload(
148 engine: ConsensusEngineHandle<EthEngineTypes>,
149 version: u8,
150 body: &[u8],
151) -> HttpResponse {
152 let payload = match decode_new_payload_request(version, body) {
153 Ok(payload) => payload,
154 Err(err) => return text_response(STATUS_BAD_REQUEST, err),
155 };
156
157 match engine.new_payload(payload).await {
158 Ok(status) => ssz_response(status),
159 Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
160 }
161}
162
163async fn handle_forkchoice_updated(
164 engine: ConsensusEngineHandle<EthEngineTypes>,
165 version: u8,
166 body: &[u8],
167) -> HttpResponse {
168 let (state, attrs) = match decode_forkchoice_request(version, body) {
169 Ok(request) => request,
170 Err(err) => return text_response(STATUS_BAD_REQUEST, err),
171 };
172
173 match engine.fork_choice_updated(state, attrs).await {
174 Ok(updated) => ssz_response(updated),
175 Err(err) => text_response(STATUS_INTERNAL_SERVER_ERROR, err.to_string()),
176 }
177}
178
179fn decode_new_payload_request(version: u8, body: &[u8]) -> Result<ExecutionData, &'static str> {
180 use ssz::Decode;
181
182 match version {
183 1 => {
184 let execution_payload =
185 decode_one::<ExecutionPayloadV1>(body).map_err(|_| "invalid ssz")?;
186 Ok(ExecutionData::new(execution_payload.into(), ExecutionPayloadSidecar::none()))
187 }
188 2 => {
189 let execution_payload =
190 decode_one::<ExecutionPayloadV2>(body).map_err(|_| "invalid ssz")?;
191 Ok(ExecutionData::new(execution_payload.into(), ExecutionPayloadSidecar::none()))
192 }
193 3 => {
194 let (execution_payload, expected_blob_versioned_hashes, parent_beacon_block_root) =
195 <(ExecutionPayloadV3, Vec<B256>, B256)>::from_ssz_bytes(body)
196 .map_err(|_| "invalid ssz")?;
197 let sidecar = ExecutionPayloadSidecar::v3(CancunPayloadFields {
198 parent_beacon_block_root,
199 versioned_hashes: expected_blob_versioned_hashes,
200 });
201 Ok(ExecutionData::new(execution_payload.into(), sidecar))
202 }
203 4 => {
204 let (
205 execution_payload,
206 expected_blob_versioned_hashes,
207 parent_beacon_block_root,
208 execution_requests,
209 ) = <(ExecutionPayloadV3, Vec<B256>, B256, Vec<Bytes>)>::from_ssz_bytes(body)
210 .map_err(|_| "invalid ssz")?;
211 let sidecar = ExecutionPayloadSidecar::v4(
212 CancunPayloadFields {
213 parent_beacon_block_root,
214 versioned_hashes: expected_blob_versioned_hashes,
215 },
216 PraguePayloadFields::new(RequestsOrHash::Requests(Requests::new(
217 execution_requests,
218 ))),
219 );
220 Ok(ExecutionData::new(execution_payload.into(), sidecar))
221 }
222 5 => {
223 let (
224 execution_payload,
225 expected_blob_versioned_hashes,
226 parent_beacon_block_root,
227 execution_requests,
228 ) = <(ExecutionPayloadV4, Vec<B256>, B256, Vec<Bytes>)>::from_ssz_bytes(body)
229 .map_err(|_| "invalid ssz")?;
230 let sidecar = ExecutionPayloadSidecar::v4(
231 CancunPayloadFields {
232 parent_beacon_block_root,
233 versioned_hashes: expected_blob_versioned_hashes,
234 },
235 PraguePayloadFields::new(RequestsOrHash::Requests(Requests::new(
236 execution_requests,
237 ))),
238 );
239 Ok(ExecutionData::new(ExecutionPayload::V4(execution_payload), sidecar))
240 }
241 _ => Err("unsupported payload endpoint version"),
242 }
243}
244
245fn decode_forkchoice_request(
246 version: u8,
247 body: &[u8],
248) -> Result<(ForkchoiceState, Option<PayloadAttributes>), &'static str> {
249 use ssz::Decode;
250
251 match version {
252 1..=4 => {
253 let (forkchoice_state, payload_attributes) =
254 <(ForkchoiceState, Vec<PayloadAttributes>)>::from_ssz_bytes(body)
255 .map_err(|_| "invalid ssz")?;
256 Ok((forkchoice_state, payload_attrs(version, payload_attributes)?))
257 }
258 _ => Err("unsupported forkchoice endpoint version"),
259 }
260}
261
262fn decode_one<T: ssz::Decode>(body: &[u8]) -> Result<T, ssz::DecodeError> {
263 let mut builder = ssz::SszDecoderBuilder::new(body);
264 builder.register_type::<T>()?;
265 let mut decoder = builder.build()?;
266 decoder.decode_next()
267}
268
269fn payload_attrs(
270 version: u8,
271 attrs: Vec<PayloadAttributes>,
272) -> Result<Option<PayloadAttributes>, &'static str> {
273 if attrs.len() > 1 {
274 return Err("payload_attributes must contain at most one value")
275 }
276
277 attrs.into_iter().next().map(|attrs| validate_payload_attrs_version(version, attrs)).transpose()
278}
279
280fn validate_payload_attrs_version(
281 version: u8,
282 attrs: PayloadAttributes,
283) -> Result<PayloadAttributes, &'static str> {
284 let matches_version = match version {
285 1 => {
286 attrs.withdrawals.is_none() &&
287 attrs.parent_beacon_block_root.is_none() &&
288 attrs.slot_number.is_none()
289 }
290 2 => {
291 attrs.withdrawals.is_some() &&
292 attrs.parent_beacon_block_root.is_none() &&
293 attrs.slot_number.is_none()
294 }
295 3 => {
296 attrs.withdrawals.is_some() &&
297 attrs.parent_beacon_block_root.is_some() &&
298 attrs.slot_number.is_none()
299 }
300 4 => {
301 attrs.withdrawals.is_some() &&
302 attrs.parent_beacon_block_root.is_some() &&
303 attrs.slot_number.is_some()
304 }
305 _ => false,
306 };
307
308 if matches_version {
309 Ok(attrs)
310 } else {
311 Err("payload_attributes version does not match endpoint")
312 }
313}
314
315fn ssz_response<T: ssz::Encode>(value: T) -> HttpResponse {
316 HttpResponse::builder()
317 .status(STATUS_OK)
318 .header(CONTENT_TYPE, OCTET_STREAM)
319 .body(HttpBody::from(value.as_ssz_bytes()))
320 .expect("valid response")
321}
322
323fn text_response(status: u16, body: impl Into<String>) -> HttpResponse {
324 HttpResponse::builder()
325 .status(status)
326 .header(CONTENT_TYPE, TEXT_PLAIN)
327 .body(HttpBody::from(body.into()))
328 .expect("valid response")
329}